认证规则
功能介绍#
认证规则为一系列认证因素的扩充,串联一个或者多个认证因素,在指定条件下实现认证因素间的协作,以完成复杂的认证过程
实现思路#
开发者在开发认证规则插件时,需继承AuthRuleExtension基类并重载check_rule抽象方法,监听系统事件并以事件机制串联认证因素,其基础流程如下:
sequenceDiagram
    participant U as 客户端
    participant C as 平台核心
    participant B as 认证规则插件
    C->>B: 加载插件
    B->>C: 监听CREATE_LOGIN_PAGE_RULES事件
    U->>C: 请求获取登陆页面
    C->>B: 触发CREATE_LOGIN_PAGE_RULES事件
    B->>C: 响应事件,遍历所有运行时配置,校验是否通过规则并将结果返回
    C->>U: 渲染登录页面抽象函数#
基类定义#
        
arkid.core.extension.auth_rule.AuthRuleExtension            (Extension)
        
#
    Source code in arkid/core/extension/auth_rule.py
          class AuthRuleExtension(Extension):
    TYPE = "auth_rule"
    composite_schema_map = {}
    created_composite_schema_list = []
    composite_key = 'type'
    composite_model = TenantExtensionConfig
    @property
    def type(self):
        return AuthRuleExtension.TYPE
    def load(self):
        super().load()
        self.listen_events()
    def check_rules(self, event, **kwargs):
        """响应事件: CREATE_LOGIN_PAGE_RULES,遍历所有运行时配置,校验是否通过规则并决定是否进行下一步操作
        Args:
            event (Event): CREATE_LOGIN_PAGE_RULES事件
        """
        for config in self.get_tenant_configs(event.tenant):
            self.check_rule(event, config)
    @abstractmethod
    def check_rule(self,event,config):
        """抽象方法:校验规则
        Args:
            event (Event): CREATE_LOGIN_PAGE_RULES事件
            config (TenantExtensionConfig): 运行时配置
        """
        pass
    def register_auth_rule_schema(self, schema, auth_rule_type):
        """注册认证规则运行时schema
        Args:
            schema (Schema): schema描述
            auth_rule_type (str): 认证规则类型
        """
        self.register_config_schema(
            schema, self.package + '_' + auth_rule_type)
        self.register_composite_config_schema(
            schema, auth_rule_type, exclude=['extension'])
    def listen_events(self):
        """监听事件
        """
        self.listen_event(CREATE_LOGIN_PAGE_RULES,self.check_rules)
    def rise_errorcode(self,event, code:Enum):
        core_event.remove_event_id(event)
        core_event.break_event_loop(self.error(code))
        
composite_model            (BaseModel)
        
  
      django-model
  
#
    TenantExtensionConfig(id, is_del, is_active, updated, created, tenant, extension, config, name, type)
Source code in arkid/core/extension/auth_rule.py
          class TenantExtensionConfig(BaseModel):
    class Meta(object):
        verbose_name = _("插件运行时配置")
        verbose_name_plural = _("插件运行时配置")
    tenant = models.ForeignKey('core.Tenant', blank=False, on_delete=models.PROTECT, verbose_name=_('租户'))
    extension = models.ForeignKey('Extension', blank=False, on_delete=models.PROTECT, verbose_name=_('插件'))
    config = models.JSONField(blank=True, default=dict, verbose_name=_('Runtime Config','运行时配置'))
    name = models.CharField(max_length=128, default='', verbose_name=_('名称'))
    type = models.CharField(max_length=128, default='', verbose_name=_('类型'))
config: JSONField
  
      blank
      django-field
  
#
    Runtime Config
created: DateTimeField
  
      blank
      django-field
      nullable
  
#
    创建时间
extension: ForeignKey
  
      django-field
  
#
    插件
id: UUIDField
  
      django-field
  
#
    ID
is_active: BooleanField
  
      django-field
  
#
    是否可用
is_del: BooleanField
  
      django-field
  
#
    是否删除
name: CharField
  
      django-field
  
#
    名称
tenant: ForeignKey
  
      django-field
  
#
    租户
type: CharField
  
      django-field
  
#
    类型
updated: DateTimeField
  
      blank
      django-field
      nullable
  
#
    更新时间
check_rule(self, event, config)
#
    抽象方法:校验规则
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| event | Event | CREATE_LOGIN_PAGE_RULES事件 | required | 
| config | TenantExtensionConfig | 运行时配置 | required | 
check_rules(self, event, **kwargs)
#
    响应事件: CREATE_LOGIN_PAGE_RULES,遍历所有运行时配置,校验是否通过规则并决定是否进行下一步操作
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| event | Event | CREATE_LOGIN_PAGE_RULES事件 | required | 
listen_events(self)
#
    
  
load(self)
#
    
  
register_auth_rule_schema(self, schema, auth_rule_type)
#
    注册认证规则运行时schema
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| schema | Schema | schema描述 | required | 
| auth_rule_type | str | 认证规则类型 | required | 
Source code in arkid/core/extension/auth_rule.py
          def register_auth_rule_schema(self, schema, auth_rule_type):
    """注册认证规则运行时schema
    Args:
        schema (Schema): schema描述
        auth_rule_type (str): 认证规则类型
    """
    self.register_config_schema(
        schema, self.package + '_' + auth_rule_type)
    self.register_composite_config_schema(
        schema, auth_rule_type, exclude=['extension'])
示例#
        
extension_root.com_longgui_auth_rule_retry_times.AuthRuleRetryTimesExtension            (AuthRuleExtension)
        
#
    Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
          class AuthRuleRetryTimesExtension(AuthRuleExtension):
    def load(self):
        super().load()
        self.create_extension_config_schema()
        self.listen_event(AUTH_FAIL,self.auth_fail)
        self.listen_event(BEFORE_AUTH,self.before_auth)
        self.listen_event(AUTH_SUCCESS,self.auth_success)
        # 配置初始数据
        tenant = Tenant.platform_tenant()
        if not self.get_tenant_configs(tenant):
            main_auth_factor = TenantExtensionConfig.active_objects.filter(
                tenant=tenant,
                extension=Extension.active_objects.filter(
                    package="com.longgui.auth.factor.password"
                ).first(),
                type="password"
            ).first()
            second_auth_factor = TenantExtensionConfig.active_objects.filter(
                tenant=tenant,
                extension=Extension.active_objects.filter(
                    package="com.longgui.auth.factor.authcode"
                ).first(),
                type="authcode"
            ).first()
            if main_auth_factor and second_auth_factor:
                # 如主认证因素和此认证因素都存在的情况下 创建认证规则
                config = {
                    "main_auth_factor": {
                        "id": main_auth_factor.id.hex, 
                        "name": main_auth_factor.name, 
                        "package": main_auth_factor.extension.package
                    }, 
                    "second_auth_factor": {
                        "id": second_auth_factor.id.hex, 
                        "name": second_auth_factor.name, 
                        "package": second_auth_factor.extension.package
                    }, 
                    "try_times": 3
                }
                self.create_tenant_config(tenant, config, "认证规则:登录失败三次启用图形验证码", "retry_times")
    def before_auth(self,event,**kwargs):
        """ 响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果
        Args:
            event: 事件
        Returns:
            tuple(bool,dict): 次级认证因素校验结果
        """
        for config in self.get_tenant_configs(event.tenant):
            if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
                host = get_remote_addr(event.request)
                if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)):
                    # 判定需要验证
                    responses = dispatch_event(
                        Event(
                            core_event.AUTHRULE_CHECK_AUTH_DATA,
                            tenant=event.tenant,
                            request=event.request,
                            packages=[
                                config.config["second_auth_factor"]["package"]
                            ]
                        )
                    )
                    for useless,(response,useless) in responses:
                        if not response:
                            continue
                        result,data = response
                        if not result:
                            return response
        return True,None
    def auth_success(self,event,**kwargs):
        # 检查是否存在满足条件的配置
        for config in self.get_tenant_configs(event.tenant):
            if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"].id.hex:
                host = get_remote_addr(event.request)
                key = self.gen_key(host,config.id.hex)
                try_times  = 1
                cache.set(event.tenant,key,try_times,expired=config.config.get("expired",30)*60)
                self.clear_refresh_status(event.tenant,host,config.id.hex)
    def auth_fail(self, event, **kwargs):
        """响应事件:认证失败,记录对应IP认证失败次数
        Args:
            event : 事件
        """
        data = event.data["data"]
        # 检查是否存在满足条件的配置
        for config in self.get_tenant_configs(event.tenant):
            if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
                host = get_remote_addr(event.request)
                key = self.gen_key(host,config.id.hex)
                try_times  = int(cache.get(event.tenant,key) or 1)
                cache.set(event.tenant,key,try_times+1,expired=config.config.get("expired",30)*60)
                if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)) and not self.check_refresh_status(event.tenant,host,config.id.hex):
                    data.update(self.error(ErrorCode.AUTH_FAIL_TIMES_OVER_LIMITED))
                    self.set_refresh_status(event.tenant,host,config.id.hex)
                    data["refresh"] = True
    def check_rule(self, event, config):
        login_pages = event.data
        if self.check_retry_times(event.tenant,get_remote_addr(event.request),config.id.hex,config.config.get("try_times",0)): 
            dispatch_event(
                Event(
                    core_event.AUTHRULE_FIX_LOGIN_PAGE,
                    tenant=event.tenant,
                    request=event.request,
                    packages=[
                        config.config["second_auth_factor"]["package"]
                    ],
                    data={
                        "login_pages": login_pages,
                        "main_auth_factor_id": config.config["main_auth_factor"]["id"],
                        "config_id":config.config["second_auth_factor"]["id"]
                    }
                )
            )
    def check_retry_times(self,tenant,host,config_id,limited=3):
        """校验认证失败次数是否超出限制
        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID
            limited (int, optional): 认证失败次数限制. Defaults to 3.
        Returns:
            bool: 校验结果
        """
        key=self.gen_key(host,config_id)
        retry_times = int(cache.get(tenant,key) or limited)
        return retry_times > limited
    def set_refresh_status(self,tenant,host,config_id):
        """设置登陆页面刷新tag
        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID
        """
        cache.set(tenant,self.gen_refresh_key(host,config_id),1)
    def clear_refresh_status(self,tenant,host,config_id):
        cache.set(tenant,self.gen_refresh_key(host,config_id),0)
    def check_refresh_status(self,tenant,host,config_id):
        """校验是否需要刷新页面
        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID
        Returns:
            bool: 是否需要刷新页面
        """
        return bool(cache.get(tenant,self.gen_refresh_key(host,config_id)))
    def gen_refresh_key(self,host:str,config_id:str):
        """页面刷新标识KEY
        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID
        Returns:
            str: 页面刷新标识KEY
        """
        return f"{self.package}_cache_auth_refresh_{host}_{config_id}"
    def gen_key(self,host:str,config_id:str):
        """生成记录失败次数的KEY
        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID
        Returns:
            str: 记录失败次数的KEY
        """
        return f"{self.package}_cache_auth_retry_times_{host}_{config_id}"
    def create_extension_config_schema(self):
        """创建插件运行时schema
        """
        main_auth_factor_page = pages.TablePage(select=True,name=_("选择主认证因素"))
        self.register_front_pages(main_auth_factor_page)
        main_auth_factor_page.create_actions(
            init_action=actions.DirectAction(
                path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
                method=actions.FrontActionMethod.GET
            )
        )
        second_auth_factor_page = pages.TablePage(select=True,name=_("选择次认证因素"))
        self.register_front_pages(second_auth_factor_page)
        second_auth_factor_page.create_actions(
            init_action=actions.DirectAction(
                path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
                method=actions.FrontActionMethod.GET
            )
        )
        AuthRuleRetryTimesConfigSchema = create_extension_schema(
            'AuthRuleRetryTimesConfigSchema',
            __file__,
            [
                (
                    'try_times', 
                    int, 
                    Field(
                        title=_('try_times', '限制重试次数'),
                        default=3
                    )
                ),
                (
                    'main_auth_factor',
                    MainAuthRuleSchema, 
                    Field(
                        title=_('main_auth_factor', '主认证因素'),
                        page=main_auth_factor_page.tag
                    )
                ),
                (
                    'second_auth_factor',
                    SecondAuthFactorConfigSchema,
                    Field(
                        title=_('second_auth_factor', '次认证因素'),
                        page=second_auth_factor_page.tag
                    )
                ),
                (
                    'expired', 
                    Optional[int],
                    Field(
                        title=_('expired', '有效期/分钟'),
                        default=30,
                    )
                ),
            ],
            base_schema=BaseAuthRuleSchema
        )
        self.register_auth_rule_schema(
            AuthRuleRetryTimesConfigSchema,
            "retry_times"
        )
auth_fail(self, event, **kwargs)
#
    响应事件:认证失败,记录对应IP认证失败次数
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| event | 事件 | required | 
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
          def auth_fail(self, event, **kwargs):
    """响应事件:认证失败,记录对应IP认证失败次数
    Args:
        event : 事件
    """
    data = event.data["data"]
    # 检查是否存在满足条件的配置
    for config in self.get_tenant_configs(event.tenant):
        if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
            host = get_remote_addr(event.request)
            key = self.gen_key(host,config.id.hex)
            try_times  = int(cache.get(event.tenant,key) or 1)
            cache.set(event.tenant,key,try_times+1,expired=config.config.get("expired",30)*60)
            if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)) and not self.check_refresh_status(event.tenant,host,config.id.hex):
                data.update(self.error(ErrorCode.AUTH_FAIL_TIMES_OVER_LIMITED))
                self.set_refresh_status(event.tenant,host,config.id.hex)
                data["refresh"] = True
before_auth(self, event, **kwargs)
#
    响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| event | 事件 | required | 
Returns:
| Type | Description | 
|---|---|
| tuple(bool,dict) | 次级认证因素校验结果 | 
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
          def before_auth(self,event,**kwargs):
    """ 响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果
    Args:
        event: 事件
    Returns:
        tuple(bool,dict): 次级认证因素校验结果
    """
    for config in self.get_tenant_configs(event.tenant):
        if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
            host = get_remote_addr(event.request)
            if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)):
                # 判定需要验证
                responses = dispatch_event(
                    Event(
                        core_event.AUTHRULE_CHECK_AUTH_DATA,
                        tenant=event.tenant,
                        request=event.request,
                        packages=[
                            config.config["second_auth_factor"]["package"]
                        ]
                    )
                )
                for useless,(response,useless) in responses:
                    if not response:
                        continue
                    result,data = response
                    if not result:
                        return response
    return True,None
check_refresh_status(self, tenant, host, config_id)
#
    校验是否需要刷新页面
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| host | str | 客户一端IP地址 | required | 
| config_id | str | 插件运行时ID | required | 
Returns:
| Type | Description | 
|---|---|
| bool | 是否需要刷新页面 | 
check_retry_times(self, tenant, host, config_id, limited=3)
#
    校验认证失败次数是否超出限制
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| host | str | 客户一端IP地址 | required | 
| config_id | str | 插件运行时ID | required | 
| limited | int | 认证失败次数限制. Defaults to 3. | 3 | 
Returns:
| Type | Description | 
|---|---|
| bool | 校验结果 | 
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
          def check_retry_times(self,tenant,host,config_id,limited=3):
    """校验认证失败次数是否超出限制
    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID
        limited (int, optional): 认证失败次数限制. Defaults to 3.
    Returns:
        bool: 校验结果
    """
    key=self.gen_key(host,config_id)
    retry_times = int(cache.get(tenant,key) or limited)
    return retry_times > limited
check_rule(self, event, config)
#
    抽象方法:校验规则
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| event | Event | CREATE_LOGIN_PAGE_RULES事件 | required | 
| config | TenantExtensionConfig | 运行时配置 | required | 
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
          def check_rule(self, event, config):
    login_pages = event.data
    if self.check_retry_times(event.tenant,get_remote_addr(event.request),config.id.hex,config.config.get("try_times",0)): 
        dispatch_event(
            Event(
                core_event.AUTHRULE_FIX_LOGIN_PAGE,
                tenant=event.tenant,
                request=event.request,
                packages=[
                    config.config["second_auth_factor"]["package"]
                ],
                data={
                    "login_pages": login_pages,
                    "main_auth_factor_id": config.config["main_auth_factor"]["id"],
                    "config_id":config.config["second_auth_factor"]["id"]
                }
            )
        )
create_extension_config_schema(self)
#
    创建插件运行时schema
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
          def create_extension_config_schema(self):
    """创建插件运行时schema
    """
    main_auth_factor_page = pages.TablePage(select=True,name=_("选择主认证因素"))
    self.register_front_pages(main_auth_factor_page)
    main_auth_factor_page.create_actions(
        init_action=actions.DirectAction(
            path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
            method=actions.FrontActionMethod.GET
        )
    )
    second_auth_factor_page = pages.TablePage(select=True,name=_("选择次认证因素"))
    self.register_front_pages(second_auth_factor_page)
    second_auth_factor_page.create_actions(
        init_action=actions.DirectAction(
            path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
            method=actions.FrontActionMethod.GET
        )
    )
    AuthRuleRetryTimesConfigSchema = create_extension_schema(
        'AuthRuleRetryTimesConfigSchema',
        __file__,
        [
            (
                'try_times', 
                int, 
                Field(
                    title=_('try_times', '限制重试次数'),
                    default=3
                )
            ),
            (
                'main_auth_factor',
                MainAuthRuleSchema, 
                Field(
                    title=_('main_auth_factor', '主认证因素'),
                    page=main_auth_factor_page.tag
                )
            ),
            (
                'second_auth_factor',
                SecondAuthFactorConfigSchema,
                Field(
                    title=_('second_auth_factor', '次认证因素'),
                    page=second_auth_factor_page.tag
                )
            ),
            (
                'expired', 
                Optional[int],
                Field(
                    title=_('expired', '有效期/分钟'),
                    default=30,
                )
            ),
        ],
        base_schema=BaseAuthRuleSchema
    )
    self.register_auth_rule_schema(
        AuthRuleRetryTimesConfigSchema,
        "retry_times"
    )
gen_key(self, host, config_id)
#
    生成记录失败次数的KEY
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| host | str | 客户一端IP地址 | required | 
| config_id | str | 插件运行时ID | required | 
Returns:
| Type | Description | 
|---|---|
| str | 记录失败次数的KEY | 
gen_refresh_key(self, host, config_id)
#
    页面刷新标识KEY
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| host | str | 客户一端IP地址 | required | 
| config_id | str | 插件运行时ID | required | 
Returns:
| Type | Description | 
|---|---|
| str | 页面刷新标识KEY | 
load(self)
#
    抽象方法,插件加载的入口方法
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
          def load(self):
    super().load()
    self.create_extension_config_schema()
    self.listen_event(AUTH_FAIL,self.auth_fail)
    self.listen_event(BEFORE_AUTH,self.before_auth)
    self.listen_event(AUTH_SUCCESS,self.auth_success)
    # 配置初始数据
    tenant = Tenant.platform_tenant()
    if not self.get_tenant_configs(tenant):
        main_auth_factor = TenantExtensionConfig.active_objects.filter(
            tenant=tenant,
            extension=Extension.active_objects.filter(
                package="com.longgui.auth.factor.password"
            ).first(),
            type="password"
        ).first()
        second_auth_factor = TenantExtensionConfig.active_objects.filter(
            tenant=tenant,
            extension=Extension.active_objects.filter(
                package="com.longgui.auth.factor.authcode"
            ).first(),
            type="authcode"
        ).first()
        if main_auth_factor and second_auth_factor:
            # 如主认证因素和此认证因素都存在的情况下 创建认证规则
            config = {
                "main_auth_factor": {
                    "id": main_auth_factor.id.hex, 
                    "name": main_auth_factor.name, 
                    "package": main_auth_factor.extension.package
                }, 
                "second_auth_factor": {
                    "id": second_auth_factor.id.hex, 
                    "name": second_auth_factor.name, 
                    "package": second_auth_factor.extension.package
                }, 
                "try_times": 3
            }
            self.create_tenant_config(tenant, config, "认证规则:登录失败三次启用图形验证码", "retry_times")
set_refresh_status(self, tenant, host, config_id)
#
    设置登陆页面刷新tag
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| host | str | 客户一端IP地址 | required | 
| config_id | str | 插件运行时ID | required |