授权规则
功能介绍#
授权规则,作为系统已有认证方式的补充,它可以支持更细粒度的权限赋予。 可以支持针对用户的不同属性进行权限的赋予,例如用户名、昵称、性别、手机号等。 也可以针对分组的不同属性进行权限的赋予。 还可以根据开发者需要,通过扩展数据存储的结构,实现特定场景下权限的赋予。
实现思路#
arkid系统已经默认实现了一个针对用户属性的权限赋予,方便开发者进行参考
graph LR
  A[开始] --> B{设计schema} --> C{注册前端页面}--> D{注册schema}--> E{实现授权函数} --> F[结束];下面对于实现思路做一下简单的介绍:
- 
需要开发者想清楚自己开发插件的需要筛选的属性,是用户属性,还是分组属性,或者其它属性。以及需要筛选那些应用,以及那些权限。 要设计出schema,用于存储数据结构。如果schema中有用到一些应用列表,权限列表,用户列表等,需要把用到的前端页面,使用父类的register_front_pages单独进行注册。 
- 
不同的授权规则之间是通过不同的schema进行划分,所以当开发者设计完schema后,需要通过register_impowerrule_schema,进行注册。 
- 
注册完成后,创建授权规则的页面绿色Type字段就会多一种授权规则,如下图所示: 如果我们选择不同的授权规则,红色部分就会展示出不同的内容,展示内容由schema结构决定。 授权规则的创建编辑和删除,都是由arkid提前处理好的,开发者只需要关注赋权方法就可以。 下面对赋权方法的使用做下介绍: 需要开发者实现get_auth_result(event, **kwargs)方法 - 参数: 这个方法的kwargs,event两个参数,我们重点关注event参数,这个参数中包含了data和tenant两个属性,其中event.tenant可以获取到当前的租户;event.data可以获取到传递过来的数据。我们获取data里面的值- data.user可以获取到当前用户;
- data.app可以获取到当前应用,如果这个应用是一个None,就表示应用为arkid,反之则为其它应用;
- data.arr可以获取到用户权限数组(0或1组成,0是没权限,1有权限,数组下标和数据库权限表里的sort_id是相同的);
- data.config可以获取到当前授权规则
 
- 使用: 开发者需要根据授权规则event.config,结合data.user,根据自身的需求,做筛选后,返回有权限的sort_id数组
 
- 参数: 这个方法的
抽象方法#
基类定义#
        
arkid.core.extension.impower_rule.ImpowerRuleBaseExtension            (Extension)
        
#
    Source code in arkid/core/extension/impower_rule.py
          class ImpowerRuleBaseExtension(Extension):
    TYPE = 'impower_rule'
    composite_schema_map = {}
    created_composite_schema_list = []
    composite_key = 'type'
    composite_model = TenantExtensionConfig
    @property
    def type(self):
        return ImpowerRuleBaseExtension.TYPE
    def load(self):
        super().load()
        self.listen_event(core_event.GET_AUTH_RESULT, self.filter_auth_result)
    def get_extensions(self):
        '''
        获取当前类型所有的插件
        '''
        return Extension_Obj.active_objects.filter(
            package=self.package,
            type=self.TYPE
        ).all()
    def get_all_config(self, tenant_id):
        '''
        获取所有的配置
        '''
        return TenantExtensionConfig.active_objects.filter(
            tenant_id=tenant_id,
            extension__in=self.get_extensions()
        ).all()
    def register_impower_rule_schema(self, schema, impowerrule_type):
        """
        注册授权规则的schema
        Params:
            schema: schema
            impowerrule_type: 授权规则类型
        """
        self.register_config_schema(schema, self.package + '_' + impowerrule_type)
        self.register_composite_config_schema(schema, impowerrule_type, exclude=['extension'])
    def filter_auth_result(self, event, **kwargs):
        '''
        筛选抽象结果
        '''
        tenant = event.tenant
        configs = self.get_all_config(tenant.id)
        data = event.data
        arr = data.get('arr', [])
        copy_arr = [x for x in arr]
        result_sort_ids = []
        # 每一个授权规则配置单独验证
        for config in configs:
            data['config'] = config
            sort_ids = self.get_auth_result(event, **kwargs)
            if sort_ids:
                result_sort_ids.extend(sort_ids)
        # 对于授权结果进行合并
        for index, value in enumerate(copy_arr):
            if int(value) == 0 and index in result_sort_ids:
                copy_arr[index] = 1
        return copy_arr
    @abstractmethod
    def get_auth_result(self, event, **kwargs):
        """
        抽象方法,获取权限的鉴定结果
        Params:
            event: 事件参数
                data: 数据
                    user: 用户
                    app: 应用(如果app是None,就表示这个应用是arkid)
                    arr: 权限结果数组(这个是已经赋值过分组权限的数据,有权限是1,没权限是0)
                    config: 应用的授权规则
                tenant: 租户
            kwargs: 其它方法参数
        Return:
            arr: sort_id数组
        """
        pass
        
composite_model            (BaseModel)
        
  
      django-model
  
#
    TenantExtensionConfig(id, is_del, is_active, updated, created, tenant, extension, config, name, type)
Source code in arkid/core/extension/impower_rule.py
          class TenantExtensionConfig(BaseModel):
    class Meta(object):
        verbose_name = _("插件运行时配置")
        verbose_name_plural = _("插件运行时配置")
    tenant = models.ForeignKey('core.Tenant', blank=False, on_delete=models.PROTECT, verbose_name=_('租户'))
    extension = models.ForeignKey('Extension', blank=False, on_delete=models.PROTECT, verbose_name=_('插件'))
    config = models.JSONField(blank=True, default=dict, verbose_name=_('Runtime Config','运行时配置'))
    name = models.CharField(max_length=128, default='', verbose_name=_('名称'))
    type = models.CharField(max_length=128, default='', verbose_name=_('类型'))
config: JSONField
  
      blank
      django-field
  
#
    Runtime Config
created: DateTimeField
  
      blank
      django-field
      nullable
  
#
    创建时间
extension: ForeignKey
  
      django-field
  
#
    插件
id: UUIDField
  
      django-field
  
#
    ID
is_active: BooleanField
  
      django-field
  
#
    是否可用
is_del: BooleanField
  
      django-field
  
#
    是否删除
name: CharField
  
      django-field
  
#
    名称
tenant: ForeignKey
  
      django-field
  
#
    租户
type: CharField
  
      django-field
  
#
    类型
updated: DateTimeField
  
      blank
      django-field
      nullable
  
#
    更新时间
filter_auth_result(self, event, **kwargs)
#
    筛选抽象结果
Source code in arkid/core/extension/impower_rule.py
          def filter_auth_result(self, event, **kwargs):
    '''
    筛选抽象结果
    '''
    tenant = event.tenant
    configs = self.get_all_config(tenant.id)
    data = event.data
    arr = data.get('arr', [])
    copy_arr = [x for x in arr]
    result_sort_ids = []
    # 每一个授权规则配置单独验证
    for config in configs:
        data['config'] = config
        sort_ids = self.get_auth_result(event, **kwargs)
        if sort_ids:
            result_sort_ids.extend(sort_ids)
    # 对于授权结果进行合并
    for index, value in enumerate(copy_arr):
        if int(value) == 0 and index in result_sort_ids:
            copy_arr[index] = 1
    return copy_arr
get_all_config(self, tenant_id)
#
    
  
get_auth_result(self, event, **kwargs)
#
    抽象方法,获取权限的鉴定结果
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| event | 事件参数 data: 数据 user: 用户 app: 应用(如果app是None,就表示这个应用是arkid) arr: 权限结果数组(这个是已经赋值过分组权限的数据,有权限是1,没权限是0) config: 应用的授权规则 tenant: 租户 | required | |
| kwargs | 其它方法参数 | {} | 
Returns:
| Type | Description | 
|---|---|
| arr | sort_id数组 | 
Source code in arkid/core/extension/impower_rule.py
          
        
get_extensions(self)
#
    
  
load(self)
#
    
  
register_impower_rule_schema(self, schema, impowerrule_type)
#
    注册授权规则的schema
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| schema | schema | required | |
| impowerrule_type | 授权规则类型 | required | 
Source code in arkid/core/extension/impower_rule.py
          
        示例#
        
extension_root.com_longgui_impower_rule.ImpowerRuleExtension            (ImpowerRuleBaseExtension)
        
#
    Source code in extension_root/com_longgui_impower_rule/__init__.py
          class ImpowerRuleExtension(ImpowerRuleBaseExtension):
    def load(self):
        super().load()
        # 注册前端页面
        self.load_front_page()
        # 注册schema
        self.register_impower_rule_schema(ImpowerRuleCreateIn, 'DefaultImpowerRule')
    def load_front_page(self):
        '''
        注册前端页面
        '''
        self.register_front_pages(user_field_page)
        self.register_front_pages(app_page)
        self.register_front_pages(app_permission_page)
    def get_auth_result(self, event, **kwargs):
        '''
        获取权限鉴定结果
        '''
        data = event.data
        tenant = event.tenant
        user = data.get('user')
        config = data.get('config')
        # 处理授权逻辑
        permission_info = {}
        # 取得了所有配置
        config_info = config.config
        config_matchfield = config_info.get('matchfield')
        config_matchsymbol = config_info.get('matchsymbol')
        config_matchvalue = config_info.get('matchvalue')
        config_app = config_info.get('app')
        config_app_id = config_app.get('id')
        config_permissions = config_info.get('permissions')
        # 用户扩展字段用于筛选
        user = User.expand_objects.filter(id=user.id).first()
        # 选择的字段值
        select_value = user.get(config_matchfield.get('id'))
        # 取得返回值
        sort_ids = []
        if config_matchsymbol == '等于' and config_matchvalue == select_value:
            for config_permission in config_permissions:
                sort_ids.append(config_permission.get('sort_id'))
        return sort_ids
get_auth_result(self, event, **kwargs)
#
    获取权限鉴定结果
Source code in extension_root/com_longgui_impower_rule/__init__.py
          def get_auth_result(self, event, **kwargs):
    '''
    获取权限鉴定结果
    '''
    data = event.data
    tenant = event.tenant
    user = data.get('user')
    config = data.get('config')
    # 处理授权逻辑
    permission_info = {}
    # 取得了所有配置
    config_info = config.config
    config_matchfield = config_info.get('matchfield')
    config_matchsymbol = config_info.get('matchsymbol')
    config_matchvalue = config_info.get('matchvalue')
    config_app = config_info.get('app')
    config_app_id = config_app.get('id')
    config_permissions = config_info.get('permissions')
    # 用户扩展字段用于筛选
    user = User.expand_objects.filter(id=user.id).first()
    # 选择的字段值
    select_value = user.get(config_matchfield.get('id'))
    # 取得返回值
    sort_ids = []
    if config_matchsymbol == '等于' and config_matchvalue == select_value:
        for config_permission in config_permissions:
            sort_ids.append(config_permission.get('sort_id'))
    return sort_ids
