Skip to content

数据同步#

用户数据同步#

功能介绍#

用户数据同步主要是通过SCIM协议同步不同系统之间的用户和组织,采用Server/Client模式,Server提供符合SCIM标准协议的User,Group等接口,Client端通过定时任务拉取Server端提供的接口获取数据

经典的场景有:

  • AD和ArkID之间的数据同步
  • HR和ArkID之间的数据同步
  • HR和AD之间的数据同步

SCIM协议参考

实现思路#

首先,Server端SCIM协议相关的实现在代码 scim_server 模块。
其中, 比较重要的三个基类为:

  • scim_server.views.view_template.ViewTemplate
    • 子类 scim_server.views.users_view.UsersViewTemplate处理用户相关的增删改查
    • 子类 scim_server.views.groups_view.GroupsViewTemplate处理组织相关的增删改查
  • scim_server.service.provider_adapter_template.ProviderAdapterTemplate
  • scim_server.service.provider_base.ProviderBase

SCIM Server处理SCIM请求的大概流程为,ViewTemplate接受请求,将请求参数转换成对象传递给ProviderAdapterTemplate
ProviderAdapterTemplate验证请求参数合法性,并进一步组装请求对象,最终调用ProviderBase中的方法处理请求对象。

ScimSyncArkIDExtension插件基类继承ProviderBase,在插件load的时候创建UsersViewGroupsView分别继承UsersViewTemplateGroupsViewTemplate
并注册对应的users_url和groups_url,至此只需要继承ScimSyncArkIDExtension插件基类并覆盖从ProviderBase继承的query_users, query_groups等方法即可实现SCIM Server。
创建SCIM Server配置时调用api.views.scim_sync.create_scim_sync接口处理函数,同时返回users_url和groups_url以供Client端拉取数据

Client端通过django_celery_beat创建定时任务,首先通过调用api.views.scim_sync.create_scim_sync接口处理函数创建Client模式的配置,配置参数需指定Scim Server,用于从SCIM Server提供的users_url和groups_url拉取数据,
在处理函数中判断如果是创建Client模式的配置,则创建定时任务,将Client模式的配置传递给celery异步task:arkid.core.tasks.sync, 这个task最终会调用插件基类中的sync方法,
sync方法首先会调get_groups_users方法获取users和groups, 然后先后调用sync_groupssync_users实现同步逻辑,具体插件需要覆盖这个两个方法实现Client端的同步逻辑

抽象方法#

Server模式的抽象方法#

Client模式的抽象方法#

基类定义#

arkid.core.extension.scim_sync.ScimSyncExtension (Extension, ProviderBase) #

Source code in arkid/core/extension/scim_sync.py
class ScimSyncExtension(Extension, ProviderBase):
    TYPE = "scim_sync"

    composite_schema_map = {}
    created_composite_schema_list = []
    composite_key = 'type'
    composite_model = TenantExtensionConfig

    @property
    def type(self):
        return ScimSyncExtension.TYPE

    def load(self):
        class UsersView(UsersViewTemplate):
            @property
            def provider(this):
                return self

            @method_decorator(jwt_token_required)
            def dispatch(self, request, *args, **kwargs):
                return super().dispatch(request, *args, **kwargs)

        class GroupsView(GroupsViewTemplate):
            @property
            def provider(this):
                return self

            @method_decorator(jwt_token_required)
            def dispatch(self, request, *args, **kwargs):
                return super().dispatch(request, *args, **kwargs)

        scim_server_urls = [
            re_path(
                rf'^scim/{self.pname}/(?P<config_id>[\w-]+)/Users(?:/(?P<uuid>[^/]+))?$',
                UsersView.as_view(),
                name=f'{self.pname}_scim_users',
            ),
            # re_path(r'^Groups/.search$', views.GroupSearchView.as_view(), name='groups-search'),
            re_path(
                rf'^scim/{self.pname}/(?P<config_id>[\w-]+)/Groups(?:/(?P<uuid>[^/]+))?$',
                GroupsView.as_view(),
                name=f'{self.pname}_scim_groups',
            ),
        ]
        self.register_routers(scim_server_urls, True)
        super().load()

    def register_scim_sync_schema(self, sync_type, client_schema, server_schema):
        schema = create_extension_schema_by_package(
            self.package,
            fields=[
                (
                    "__root__",
                    Union[(client_schema, server_schema)],
                    Field(discriminator="mode"),
                )
            ],
            base_schema=RootSchema,
        )
        self.register_config_schema(schema, self.package + '_' + sync_type)
        self.register_composite_config_schema(schema, sync_type, exclude=['extension'])

    def sync(self, config, sync_log):
        """
        Args:
            config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
        """
        logger.info(
            f"============= Sync Start With Config: {config}/{config.config} ================"
        )
        groups, users = self.get_groups_users(config)
        if not groups or not users:
            return
        self.sync_groups(groups, config, sync_log)
        self.sync_users(users, config, sync_log)

    def get_data(self, url, token):
        logger.info(f"Getting data from {url}")
        headers = {"Authorization": f"jwt {token}"}
        r = requests.get(url, headers=headers)
        if r.status_code == 200:
            return r.json()
        return {}

    def get_groups_users(self, config):
        """
        Args:
            config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
        """
        sync_server_id = config.config.get("sync_server", {}).get("id")
        server_config = TenantExtensionConfig.active_objects.filter(
            id=sync_server_id
        ).first()
        if not server_config:
            logger.error(f"No scim sync server config found: {sync_server_id}")
            return None, None
        group_url = server_config.config["group_url"]
        user_url = server_config.config["user_url"]
        token = server_config.config["token"]
        groups = self.get_data(group_url, token).get("Resources")
        users = self.get_data(user_url, token).get("Resources")
        return groups, users

    @abstractmethod
    def sync_groups(self, groups, config, sync_log):
        """
        抽象方法
        Args:
            groups (List): SCIM Server返回的组织列表
            config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
        """
        pass

    @abstractmethod
    def sync_users(self, users, config, sync_log):
        """
        抽象方法
        Args:
            users (List): SCIM Server返回的用户列表
            config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
        """
        pass

    def get_current_config(self, event):
        config_id = event.request.POST.get('config_id')
        return self.get_config_by_id(config_id)

    def create_tenant_config(self, tenant, config, name, type):
        config_created = super().create_tenant_config(
            tenant, config, name=name, type=type
        )
        if config["mode"] == "server":
            server_host = get_app_config().get_host()
            user_url = server_host + reverse(
                f'api:{self.pname}_tenant:{self.pname}_scim_users',
                args=[tenant.id, config_created.id],
            )
            group_url = server_host + reverse(
                f'api:{self.pname}_tenant:{self.pname}_scim_groups',
                args=[tenant.id, config_created.id],
            )
            config["group_url"] = group_url
            config["user_url"] = user_url
            # 生成用于认证的token和secret
            secret = uuid.uuid4().hex
            config["secret"] = secret
            body = {"sub": config_created.id.hex}
            config["token"] = jwt.encode(body, secret, algorithm="HS256")
            config_created.config = config
            config_created.save()
        return config_created

    @abstractmethod
    def create_user(self, request, resource, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            resource (scim_server.schemas.core2_enterprise_user.Core2EnterpriseUser): SCIM用户对象
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def create_group(self, request, resource, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            resource (scim_server.schemas.core2_group.Core2Group): SCIM组织对象
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def delete_user(self, request, resource_identifier, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            resource_identifier (str): 用户ID
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def delete_group(self, request, resource_identifier, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            resource_identifier (str): 组织ID
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def replace_user(self, request, resource, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            resource (scim_server.schemas.core2_enterprise_user.Core2EnterpriseUser): SCIM用户对象
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def replace_group(self, request, resource, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            resource (scim_server.schemas.core2_group.Core2Group): SCIM组织对象
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def retrieve_user(self, request, parameters, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            parameters (scim_server.protocol.resource_retrieval_parameters.ResourceRetrievalParamters): Retrieve请求对象
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def retrieve_group(self, request, parameters, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            parameters (scim_server.protocol.resource_retrieval_parameters.ResourceRetrievalParamters): Retrieve请求对象
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def update_user(self, request, patch, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            patch (scim_server.service.patch.Patch): Patch参数对象
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def update_group(self, request, patch, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            patch (scim_server.service.patch.Patch): Patch参数对象
            correlation_identifier (str): 请求唯一标识
        """
        raise NotImplementedException()

    @abstractmethod
    def query_users(self, request, parameters, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            parameters (scim_server.protocol.query_parameters.QueryParameters): Query请求对象
            correlation_identifier (str): 请求唯一标识
        Returns:
            List[Core2EnterpriseUser]: 返回scim_server模块中的标准用户对象列表
        """
        pass

    @abstractmethod
    def query_groups(self, request, parameters, correlation_identifier):
        """
        抽象方法
        Args:
            request (HttpRequest): Django 请求
            parameters (scim_server.protocol.query_parameters.QueryParameters): Query请求对象
            correlation_identifier (str): 请求唯一标识
        Returns:
            List[Core2Group]: 返回scim_server模块中的标准组织对象列表
        """
        pass

composite_model (BaseModel) django-model #

TenantExtensionConfig(id, is_del, is_active, updated, created, tenant, extension, config, name, type)

Source code in arkid/core/extension/scim_sync.py
class TenantExtensionConfig(BaseModel):

    class Meta(object):
        verbose_name = _("插件运行时配置")
        verbose_name_plural = _("插件运行时配置")

    tenant = models.ForeignKey('core.Tenant', blank=False, on_delete=models.PROTECT, verbose_name=_('租户'))
    extension = models.ForeignKey('Extension', blank=False, on_delete=models.PROTECT, verbose_name=_('插件'))
    config = models.JSONField(blank=True, default=dict, verbose_name=_('Runtime Config','运行时配置'))
    name = models.CharField(max_length=128, default='', verbose_name=_('名称'))
    type = models.CharField(max_length=128, default='', verbose_name=_('类型'))

config: JSONField blank django-field #

Runtime Config

created: DateTimeField blank django-field nullable #

创建时间

extension: ForeignKey django-field #

插件

id: UUIDField django-field #

ID

is_active: BooleanField django-field #

是否可用

is_del: BooleanField django-field #

是否删除

name: CharField django-field #

名称

tenant: ForeignKey django-field #

租户

type: CharField django-field #

类型

updated: DateTimeField blank django-field nullable #

更新时间

create_group(self, request, resource, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource scim_server.schemas.core2_group.Core2Group

SCIM组织对象

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def create_group(self, request, resource, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        resource (scim_server.schemas.core2_group.Core2Group): SCIM组织对象
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

create_tenant_config(self, tenant, config, name, type) #

创建运行时配置

Parameters:

Name Type Description Default
tenant Tenant

租户

required
config dict

config

required
name str

运行时配置名字

required
type str

配置类型

required

Returns:

Type Description
TenantExtensionConfig

创建的对象

Source code in arkid/core/extension/scim_sync.py
def create_tenant_config(self, tenant, config, name, type):
    config_created = super().create_tenant_config(
        tenant, config, name=name, type=type
    )
    if config["mode"] == "server":
        server_host = get_app_config().get_host()
        user_url = server_host + reverse(
            f'api:{self.pname}_tenant:{self.pname}_scim_users',
            args=[tenant.id, config_created.id],
        )
        group_url = server_host + reverse(
            f'api:{self.pname}_tenant:{self.pname}_scim_groups',
            args=[tenant.id, config_created.id],
        )
        config["group_url"] = group_url
        config["user_url"] = user_url
        # 生成用于认证的token和secret
        secret = uuid.uuid4().hex
        config["secret"] = secret
        body = {"sub": config_created.id.hex}
        config["token"] = jwt.encode(body, secret, algorithm="HS256")
        config_created.config = config
        config_created.save()
    return config_created

create_user(self, request, resource, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource scim_server.schemas.core2_enterprise_user.Core2EnterpriseUser

SCIM用户对象

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def create_user(self, request, resource, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        resource (scim_server.schemas.core2_enterprise_user.Core2EnterpriseUser): SCIM用户对象
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

delete_group(self, request, resource_identifier, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource_identifier str

组织ID

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def delete_group(self, request, resource_identifier, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        resource_identifier (str): 组织ID
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

delete_user(self, request, resource_identifier, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource_identifier str

用户ID

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def delete_user(self, request, resource_identifier, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        resource_identifier (str): 用户ID
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

get_groups_users(self, config) #

Parameters:

Name Type Description Default
config arkid.extension.models.TenantExtensionConfig

Client模式创建的配置

required
Source code in arkid/core/extension/scim_sync.py
def get_groups_users(self, config):
    """
    Args:
        config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
    """
    sync_server_id = config.config.get("sync_server", {}).get("id")
    server_config = TenantExtensionConfig.active_objects.filter(
        id=sync_server_id
    ).first()
    if not server_config:
        logger.error(f"No scim sync server config found: {sync_server_id}")
        return None, None
    group_url = server_config.config["group_url"]
    user_url = server_config.config["user_url"]
    token = server_config.config["token"]
    groups = self.get_data(group_url, token).get("Resources")
    users = self.get_data(user_url, token).get("Resources")
    return groups, users

load(self) #

抽象方法,插件加载的入口方法

Source code in arkid/core/extension/scim_sync.py
def load(self):
    class UsersView(UsersViewTemplate):
        @property
        def provider(this):
            return self

        @method_decorator(jwt_token_required)
        def dispatch(self, request, *args, **kwargs):
            return super().dispatch(request, *args, **kwargs)

    class GroupsView(GroupsViewTemplate):
        @property
        def provider(this):
            return self

        @method_decorator(jwt_token_required)
        def dispatch(self, request, *args, **kwargs):
            return super().dispatch(request, *args, **kwargs)

    scim_server_urls = [
        re_path(
            rf'^scim/{self.pname}/(?P<config_id>[\w-]+)/Users(?:/(?P<uuid>[^/]+))?$',
            UsersView.as_view(),
            name=f'{self.pname}_scim_users',
        ),
        # re_path(r'^Groups/.search$', views.GroupSearchView.as_view(), name='groups-search'),
        re_path(
            rf'^scim/{self.pname}/(?P<config_id>[\w-]+)/Groups(?:/(?P<uuid>[^/]+))?$',
            GroupsView.as_view(),
            name=f'{self.pname}_scim_groups',
        ),
    ]
    self.register_routers(scim_server_urls, True)
    super().load()

query_groups(self, request, parameters, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
parameters scim_server.protocol.query_parameters.QueryParameters

Query请求对象

required
correlation_identifier str

请求唯一标识

required

Returns:

Type Description
List[Core2Group]

返回scim_server模块中的标准组织对象列表

Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def query_groups(self, request, parameters, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        parameters (scim_server.protocol.query_parameters.QueryParameters): Query请求对象
        correlation_identifier (str): 请求唯一标识
    Returns:
        List[Core2Group]: 返回scim_server模块中的标准组织对象列表
    """
    pass

query_users(self, request, parameters, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
parameters scim_server.protocol.query_parameters.QueryParameters

Query请求对象

required
correlation_identifier str

请求唯一标识

required

Returns:

Type Description
List[Core2EnterpriseUser]

返回scim_server模块中的标准用户对象列表

Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def query_users(self, request, parameters, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        parameters (scim_server.protocol.query_parameters.QueryParameters): Query请求对象
        correlation_identifier (str): 请求唯一标识
    Returns:
        List[Core2EnterpriseUser]: 返回scim_server模块中的标准用户对象列表
    """
    pass

replace_group(self, request, resource, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource scim_server.schemas.core2_group.Core2Group

SCIM组织对象

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def replace_group(self, request, resource, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        resource (scim_server.schemas.core2_group.Core2Group): SCIM组织对象
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

replace_user(self, request, resource, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource scim_server.schemas.core2_enterprise_user.Core2EnterpriseUser

SCIM用户对象

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def replace_user(self, request, resource, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        resource (scim_server.schemas.core2_enterprise_user.Core2EnterpriseUser): SCIM用户对象
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

retrieve_group(self, request, parameters, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
parameters scim_server.protocol.resource_retrieval_parameters.ResourceRetrievalParamters

Retrieve请求对象

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def retrieve_group(self, request, parameters, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        parameters (scim_server.protocol.resource_retrieval_parameters.ResourceRetrievalParamters): Retrieve请求对象
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

retrieve_user(self, request, parameters, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
parameters scim_server.protocol.resource_retrieval_parameters.ResourceRetrievalParamters

Retrieve请求对象

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def retrieve_user(self, request, parameters, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        parameters (scim_server.protocol.resource_retrieval_parameters.ResourceRetrievalParamters): Retrieve请求对象
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

sync(self, config, sync_log) #

Parameters:

Name Type Description Default
config arkid.extension.models.TenantExtensionConfig

Client模式创建的配置

required
Source code in arkid/core/extension/scim_sync.py
def sync(self, config, sync_log):
    """
    Args:
        config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
    """
    logger.info(
        f"============= Sync Start With Config: {config}/{config.config} ================"
    )
    groups, users = self.get_groups_users(config)
    if not groups or not users:
        return
    self.sync_groups(groups, config, sync_log)
    self.sync_users(users, config, sync_log)

sync_groups(self, groups, config, sync_log) #

抽象方法

Parameters:

Name Type Description Default
groups List

SCIM Server返回的组织列表

required
config arkid.extension.models.TenantExtensionConfig

Client模式创建的配置

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def sync_groups(self, groups, config, sync_log):
    """
    抽象方法
    Args:
        groups (List): SCIM Server返回的组织列表
        config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
    """
    pass

sync_users(self, users, config, sync_log) #

抽象方法

Parameters:

Name Type Description Default
users List

SCIM Server返回的用户列表

required
config arkid.extension.models.TenantExtensionConfig

Client模式创建的配置

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def sync_users(self, users, config, sync_log):
    """
    抽象方法
    Args:
        users (List): SCIM Server返回的用户列表
        config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
    """
    pass

update_group(self, request, patch, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
patch scim_server.service.patch.Patch

Patch参数对象

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def update_group(self, request, patch, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        patch (scim_server.service.patch.Patch): Patch参数对象
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

update_user(self, request, patch, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
patch scim_server.service.patch.Patch

Patch参数对象

required
correlation_identifier str

请求唯一标识

required
Source code in arkid/core/extension/scim_sync.py
@abstractmethod
def update_user(self, request, patch, correlation_identifier):
    """
    抽象方法
    Args:
        request (HttpRequest): Django 请求
        patch (scim_server.service.patch.Patch): Patch参数对象
        correlation_identifier (str): 请求唯一标识
    """
    raise NotImplementedException()

示例#

extension_root.com_longgui_scim_sync_arkid.ScimSyncArkIDExtension (ScimSyncExtension) #

Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
class ScimSyncArkIDExtension(ScimSyncExtension):
    def load(self):
        self.register_scim_sync_schema('ArkID', ClientConfig, ServerConfig)
        super().load()

    def _get_arkid_user_attrs(self, user):
        active = user.get("active")
        if active is None:
            active = True

        return {
            "username": user.get("userName", ""),
            "is_active": active,
            "is_del": False,
        }

    def _get_arkid_user(self, scim_user, tenant, sync_log):
        scim_external_id = scim_user["id"]
        username = scim_user["userName"]
        arkid_user_attrs = self._get_arkid_user_attrs(scim_user)
        user_lookup = {
            "scim_external_id": scim_external_id,
            "username": username,
            "tenant": tenant,
        }
        # arkid_user, _ = User.objects.update_or_create(
        #     defaults=arkid_user_attrs, **user_lookup
        # )
        arkid_user = User.objects.filter(**user_lookup).first()
        if not arkid_user:
            user_lookup.update(arkid_user_attrs)
            arkid_user = User.objects.create(**user_lookup)
            sync_log.users_created += 1
        tenant.users.add(arkid_user)

        # 更新arkid_user所属的group
        arkid_user.usergroup_set.clear()
        for scim_group in scim_user.get("groups", []):
            scim_group_id = scim_group.get("value")
            arkid_group = self.scim_arkid_group_map.get(scim_group_id)
            if arkid_group:
                arkid_user.usergroup_set.add(arkid_group)
        # arkid_user.save()
        return arkid_user

    def _get_arkid_group(self, group, scim_arkid_map, tenant, sync_log):
        scim_external_id = group["id"] if "id" in group else group["value"]
        if scim_external_id not in scim_arkid_map:
            group_lookup = {"scim_external_id": scim_external_id, "tenant": tenant}
            arkid_group = UserGroup.objects.filter(**group_lookup).first()
            if not arkid_group:
                arkid_group = UserGroup.objects.create(**group_lookup)
                sync_log.groups_created += 1
            else:
                arkid_group.is_del = False
                arkid_group.is_active = True
            scim_arkid_map[scim_external_id] = arkid_group
            return arkid_group
        else:
            return scim_arkid_map[scim_external_id]

    def _sync_group_attr(self, arkid_group, scim_group):
        arkid_group.name = scim_group.get("displayName")
        arkid_group.save()

    def delete_group_from_root(self, root):
        logger.info(f"Delete Group {root.name} Start")
        children = root.children.all()
        if not children:
            root.delete()
            logger.info(f"delete group {root.name} success")
            return
        for item in children:
            self.delete_group_from_root(item)
        root.delete()
        logger.info(f"delete group {root.name} success")

    def sync_groups(self, groups, config, sync_log):
        """
        遍历groups中的SCIM 组织,逐一和ArkID中的组织匹配,如果不存在就创建,存在则更新,在此过程中
        同时遍历每个SCIM 组织中的members,同样的方式在ArkID中创建或更新组织,并且维护组织之间的父子关系,
        最后删除以前同步到ArkID但不在本次同步数据中的组织
        Args:
            groups (List): SCIM Server返回的组织列表
            config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
        """
        logger.info("###### update&create groups ######")
        tenant = config.tenant
        self.scim_arkid_group_map = {}
        for group in groups:
            parent_group = self._get_arkid_group(
                group, self.scim_arkid_group_map, tenant, sync_log
            )
            self._sync_group_attr(parent_group, group)
            for member in group.get("members", []):
                sub_group = self._get_arkid_group(
                    member, self.scim_arkid_group_map, tenant, sync_log
                )
                sub_group.parent = parent_group

        logger.info("###### delete groups ######")
        groups_need_delete = (
            UserGroup.valid_objects.filter(tenant=config.tenant)
            .exclude(scim_external_id=None)
            .exclude(scim_external_id__in=self.scim_arkid_group_map.keys())
        )
        logger.info(f"******* groups to be deleted: {groups_need_delete} ********")
        root_groups = []
        for grp in groups_need_delete:
            if (grp.parent is None) or (grp.parent not in groups_need_delete):
                root_groups.append(grp)
        for root in root_groups:
            self.delete_group_from_root(root)
        delete_count = len(groups_need_delete)
        # groups_need_delete.delete()
        sync_log.groups_deleted = delete_count

    def sync_users(self, users, config, sync_log):
        """
        遍历users中的SCIM 用户记录,逐一和ArkID中的用户匹配,如果不存在匹配的就创建,存在则更新,
        最后删除以前同步到ArkID但不在本次同步数据中的用户
        Args:
            users (List): SCIM Server返回的用户列表
            config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
        """
        logger.info("###### update&create users ######")
        tenant = config.tenant
        scim_user_ids = []
        for user in users:
            scim_user_ids.append(user["id"])
            try:
                arkid_user = self._get_arkid_user(user, tenant, sync_log)
            except IntegrityError as e:
                logger.error(e)
                logger.error(f"sync user failed: {user}")

        logger.info("###### delete users ######")
        users_need_delete = (
            tenant.users.filter(is_del=False)
            .exclude(scim_external_id=None)
            .exclude(scim_external_id__in=scim_user_ids)
        )
        logger.info(f"***** users to be deleted: {users_need_delete} ******")
        for u in users_need_delete:
            u.usergroup_set.clear()
            u.delete()
            sync_log.users_deleted += 1
            # users_need_delete.delete()

    def _get_scim_user(self, arkid_user):
        attr_map = {"id": "id", "username": "userName", "is_active": "active"}
        scim_user = Core2EnterpriseUser(userName='', groups=[])
        for arkid_attr, scim_attr in attr_map.items():
            value = getattr(arkid_user, arkid_attr)
            scim_path = Path.create(scim_attr)
            if (
                scim_path.schema_identifier
                and scim_path.schema_identifier == SchemaIdentifiers.Core2EnterpriseUser
            ):
                compose_enterprise_extension(scim_user, scim_path, value)
            else:
                compose_core2_user(scim_user, scim_path, value)

        # 生成用户所在的组
        parent_groups = arkid_user.usergroup_set.filter(is_del=0)
        for grp in parent_groups:
            scim_group = ScimUserGroup()
            scim_group.value = grp.id
            scim_group.display = grp.name
            scim_user.groups.append(scim_group)
        return scim_user

    def _get_scim_group(self, arkid_group):
        members = UserGroup.valid_objects.filter(parent=arkid_group)
        attr_map = {"id": "id", "name": "displayName"}
        scim_group = Core2Group(displayName='')
        for arkid_attr, scim_attr in attr_map.items():
            value = getattr(arkid_group, arkid_attr)
            scim_path = Path.create(scim_attr)
            compose_core2_group(scim_group, scim_path, value)
        for item in members:
            member = Member()
            member.value = item.id
            scim_group.members.append(member)
        return scim_group

    def _get_all_scim_users(self, tenant):
        scim_users = []
        arkid_users = User.valid_objects.filter(tenant=tenant)
        for arkid_user in arkid_users:
            scim_user = self._get_scim_user(arkid_user)
            scim_users.append(scim_user)
        return scim_users

    def _get_all_scim_groups(self, tenant):
        scim_groups = []
        arkid_groups = UserGroup.valid_objects.filter(tenant=tenant)
        for arkid_group in arkid_groups:
            scim_group = self._get_scim_group(arkid_group)
            scim_groups.append(scim_group)
        return scim_groups

    def query_users(self, request, parameters, correlation_identifier):
        """
        将ArkID中的用户转换成scim_server中的符合SCIM标准的Core2EnterpriseUser对象
        Args:
            request (HttpRequest): Django 请求
            parameters (scim_server.protocol.query_parameters.QueryParameters): Query请求对象
            correlation_identifier (str): 请求唯一标识
        Returns:
            List[Core2EnterpriseUser]: 返回scim_server模块中的标准用户对象列表
        """
        if not parameters.alternate_filters:
            all_users = self._get_all_scim_users(request.tenant)
            return all_users

    def query_groups(self, request, parameters, correlation_identifier):
        """
        将ArkID中的组织转换成scim_server中的符合SCIM标准的Core2Group对象
        Args:
            request (HttpRequest): Django 请求
            parameters (scim_server.protocol.query_parameters.QueryParameters): Query请求对象
            correlation_identifier (str): 请求唯一标识
        Returns:
            List[Core2Group]: 返回scim_server模块中的标准组织对象列表
        """
        if not parameters.alternate_filters:
            groups = self._get_all_scim_groups(request.tenant)
            return groups

    def create_user(self, request, resource, correlation_identifier):
        raise NotImplementedException()

    def create_group(self, request, resource, correlation_identifier):
        raise NotImplementedException()

    def delete_user(self, request, resource_identifier, correlation_identifier):
        raise NotImplementedException()

    def delete_group(self, request, resource_identifier, correlation_identifier):
        raise NotImplementedException()

    def replace_user(self, request, resource, correlation_identifier):
        raise NotImplementedException()

    def replace_group(self, request, resource, correlation_identifier):
        raise NotImplementedException()

    def retrieve_user(self, request, parameters, correlation_identifier):
        raise NotImplementedException()

    def retrieve_group(self, request, parameters, correlation_identifier):
        raise NotImplementedException()

    def update_user(self, request, patch, correlation_identifier):
        raise NotImplementedException()

    def update_group(self, request, patch, correlation_identifier):
        raise NotImplementedException()

create_group(self, request, resource, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource scim_server.schemas.core2_group.Core2Group

SCIM组织对象

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def create_group(self, request, resource, correlation_identifier):
    raise NotImplementedException()

create_user(self, request, resource, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource scim_server.schemas.core2_enterprise_user.Core2EnterpriseUser

SCIM用户对象

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def create_user(self, request, resource, correlation_identifier):
    raise NotImplementedException()

delete_group(self, request, resource_identifier, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource_identifier str

组织ID

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def delete_group(self, request, resource_identifier, correlation_identifier):
    raise NotImplementedException()

delete_user(self, request, resource_identifier, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource_identifier str

用户ID

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def delete_user(self, request, resource_identifier, correlation_identifier):
    raise NotImplementedException()

load(self) #

抽象方法,插件加载的入口方法

Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def load(self):
    self.register_scim_sync_schema('ArkID', ClientConfig, ServerConfig)
    super().load()

query_groups(self, request, parameters, correlation_identifier) #

将ArkID中的组织转换成scim_server中的符合SCIM标准的Core2Group对象

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
parameters scim_server.protocol.query_parameters.QueryParameters

Query请求对象

required
correlation_identifier str

请求唯一标识

required

Returns:

Type Description
List[Core2Group]

返回scim_server模块中的标准组织对象列表

Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def query_groups(self, request, parameters, correlation_identifier):
    """
    将ArkID中的组织转换成scim_server中的符合SCIM标准的Core2Group对象
    Args:
        request (HttpRequest): Django 请求
        parameters (scim_server.protocol.query_parameters.QueryParameters): Query请求对象
        correlation_identifier (str): 请求唯一标识
    Returns:
        List[Core2Group]: 返回scim_server模块中的标准组织对象列表
    """
    if not parameters.alternate_filters:
        groups = self._get_all_scim_groups(request.tenant)
        return groups

query_users(self, request, parameters, correlation_identifier) #

将ArkID中的用户转换成scim_server中的符合SCIM标准的Core2EnterpriseUser对象

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
parameters scim_server.protocol.query_parameters.QueryParameters

Query请求对象

required
correlation_identifier str

请求唯一标识

required

Returns:

Type Description
List[Core2EnterpriseUser]

返回scim_server模块中的标准用户对象列表

Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def query_users(self, request, parameters, correlation_identifier):
    """
    将ArkID中的用户转换成scim_server中的符合SCIM标准的Core2EnterpriseUser对象
    Args:
        request (HttpRequest): Django 请求
        parameters (scim_server.protocol.query_parameters.QueryParameters): Query请求对象
        correlation_identifier (str): 请求唯一标识
    Returns:
        List[Core2EnterpriseUser]: 返回scim_server模块中的标准用户对象列表
    """
    if not parameters.alternate_filters:
        all_users = self._get_all_scim_users(request.tenant)
        return all_users

replace_group(self, request, resource, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource scim_server.schemas.core2_group.Core2Group

SCIM组织对象

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def replace_group(self, request, resource, correlation_identifier):
    raise NotImplementedException()

replace_user(self, request, resource, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
resource scim_server.schemas.core2_enterprise_user.Core2EnterpriseUser

SCIM用户对象

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def replace_user(self, request, resource, correlation_identifier):
    raise NotImplementedException()

retrieve_group(self, request, parameters, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
parameters scim_server.protocol.resource_retrieval_parameters.ResourceRetrievalParamters

Retrieve请求对象

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def retrieve_group(self, request, parameters, correlation_identifier):
    raise NotImplementedException()

retrieve_user(self, request, parameters, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
parameters scim_server.protocol.resource_retrieval_parameters.ResourceRetrievalParamters

Retrieve请求对象

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def retrieve_user(self, request, parameters, correlation_identifier):
    raise NotImplementedException()

sync_groups(self, groups, config, sync_log) #

遍历groups中的SCIM 组织,逐一和ArkID中的组织匹配,如果不存在就创建,存在则更新,在此过程中 同时遍历每个SCIM 组织中的members,同样的方式在ArkID中创建或更新组织,并且维护组织之间的父子关系, 最后删除以前同步到ArkID但不在本次同步数据中的组织

Parameters:

Name Type Description Default
groups List

SCIM Server返回的组织列表

required
config arkid.extension.models.TenantExtensionConfig

Client模式创建的配置

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def sync_groups(self, groups, config, sync_log):
    """
    遍历groups中的SCIM 组织,逐一和ArkID中的组织匹配,如果不存在就创建,存在则更新,在此过程中
    同时遍历每个SCIM 组织中的members,同样的方式在ArkID中创建或更新组织,并且维护组织之间的父子关系,
    最后删除以前同步到ArkID但不在本次同步数据中的组织
    Args:
        groups (List): SCIM Server返回的组织列表
        config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
    """
    logger.info("###### update&create groups ######")
    tenant = config.tenant
    self.scim_arkid_group_map = {}
    for group in groups:
        parent_group = self._get_arkid_group(
            group, self.scim_arkid_group_map, tenant, sync_log
        )
        self._sync_group_attr(parent_group, group)
        for member in group.get("members", []):
            sub_group = self._get_arkid_group(
                member, self.scim_arkid_group_map, tenant, sync_log
            )
            sub_group.parent = parent_group

    logger.info("###### delete groups ######")
    groups_need_delete = (
        UserGroup.valid_objects.filter(tenant=config.tenant)
        .exclude(scim_external_id=None)
        .exclude(scim_external_id__in=self.scim_arkid_group_map.keys())
    )
    logger.info(f"******* groups to be deleted: {groups_need_delete} ********")
    root_groups = []
    for grp in groups_need_delete:
        if (grp.parent is None) or (grp.parent not in groups_need_delete):
            root_groups.append(grp)
    for root in root_groups:
        self.delete_group_from_root(root)
    delete_count = len(groups_need_delete)
    # groups_need_delete.delete()
    sync_log.groups_deleted = delete_count

sync_users(self, users, config, sync_log) #

遍历users中的SCIM 用户记录,逐一和ArkID中的用户匹配,如果不存在匹配的就创建,存在则更新, 最后删除以前同步到ArkID但不在本次同步数据中的用户

Parameters:

Name Type Description Default
users List

SCIM Server返回的用户列表

required
config arkid.extension.models.TenantExtensionConfig

Client模式创建的配置

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def sync_users(self, users, config, sync_log):
    """
    遍历users中的SCIM 用户记录,逐一和ArkID中的用户匹配,如果不存在匹配的就创建,存在则更新,
    最后删除以前同步到ArkID但不在本次同步数据中的用户
    Args:
        users (List): SCIM Server返回的用户列表
        config (arkid.extension.models.TenantExtensionConfig): Client模式创建的配置
    """
    logger.info("###### update&create users ######")
    tenant = config.tenant
    scim_user_ids = []
    for user in users:
        scim_user_ids.append(user["id"])
        try:
            arkid_user = self._get_arkid_user(user, tenant, sync_log)
        except IntegrityError as e:
            logger.error(e)
            logger.error(f"sync user failed: {user}")

    logger.info("###### delete users ######")
    users_need_delete = (
        tenant.users.filter(is_del=False)
        .exclude(scim_external_id=None)
        .exclude(scim_external_id__in=scim_user_ids)
    )
    logger.info(f"***** users to be deleted: {users_need_delete} ******")
    for u in users_need_delete:
        u.usergroup_set.clear()
        u.delete()
        sync_log.users_deleted += 1
        # users_need_delete.delete()

update_group(self, request, patch, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
patch scim_server.service.patch.Patch

Patch参数对象

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def update_group(self, request, patch, correlation_identifier):
    raise NotImplementedException()

update_user(self, request, patch, correlation_identifier) #

抽象方法

Parameters:

Name Type Description Default
request HttpRequest

Django 请求

required
patch scim_server.service.patch.Patch

Patch参数对象

required
correlation_identifier str

请求唯一标识

required
Source code in extension_root/com_longgui_scim_sync_arkid/__init__.py
def update_user(self, request, patch, correlation_identifier):
    raise NotImplementedException()

权限数据同步#

评论