Skip to content

CertificationRules

Features#

Certification rules are the expansion of a series of certification factors,One or more authentication factors in series,Implement collaboration between certification factors under the specified conditions,To complete the complex certification process

Implementation#

When developing certification rules plug -in,You need to inherit the AuthruleExtension base class and re -load check_Rule abstraction method,System events and event mechanisms are connected in series authentication factors,The basic process is as follows:

sequenceDiagram
    participant U as Client
    participant C as Platform core
    participant B as Certification rules plug -in

    C->>B: Loading plug -in
    B->>C: Monitor Create_LOGIN_PAGE_Rules event
    U->>C: Request to get the login page
    C->>B: Trigger Create_LOGIN_PAGE_Rules event
    B->>C: Response event,Traversing all running configuration,Whether the verification passes the rules and returns the result
    C->>U: Rendering login page

Abstract function#

Foundation definition#

arkid.core.extension.auth_rule.AuthRuleExtension (Extension) #

Source code in arkid/core/extension/auth_rule.py
class AuthRuleExtension(Extension):

    TYPE = "auth_rule"

    composite_schema_map = {}
    created_composite_schema_list = []
    composite_key = 'type'
    composite_model = TenantExtensionConfig

    @property
    def type(self):
        return AuthRuleExtension.TYPE

    def load(self):
        super().load()
        self.listen_events()

    def check_rules(self, event, **kwargs):
        """响应事件: CREATE_LOGIN_PAGE_RULES,遍历所有运行时配置,校验是否通过规则并决定是否进行下一步操作

        Args:
            event (Event): CREATE_LOGIN_PAGE_RULES事件
        """
        for config in self.get_tenant_configs(event.tenant):
            self.check_rule(event, config)

    @abstractmethod
    def check_rule(self,event,config):
        """抽象方法:校验规则

        Args:
            event (Event): CREATE_LOGIN_PAGE_RULES事件
            config (TenantExtensionConfig): 运行时配置
        """
        pass

    def register_auth_rule_schema(self, schema, auth_rule_type):
        """注册认证规则运行时schema

        Args:
            schema (Schema): schema描述
            auth_rule_type (str): 认证规则类型
        """
        self.register_config_schema(
            schema, self.package + '_' + auth_rule_type)
        self.register_composite_config_schema(
            schema, auth_rule_type, exclude=['extension'])

    def listen_events(self):
        """监听事件
        """
        self.listen_event(CREATE_LOGIN_PAGE_RULES,self.check_rules)

    def rise_errorcode(self,event, code:Enum):
        core_event.remove_event_id(event)
        core_event.break_event_loop(self.error(code))

composite_model (BaseModel) django-model #

TenantExtensionConfig(id, is_del, is_active, updated, created, tenant, extension, config, name, type)

Source code in arkid/core/extension/auth_rule.py
class TenantExtensionConfig(BaseModel):

    class Meta(object):
        verbose_name = _("插件运行时配置")
        verbose_name_plural = _("插件运行时配置")

    tenant = models.ForeignKey('core.Tenant', blank=False, on_delete=models.PROTECT, verbose_name=_('租户'))
    extension = models.ForeignKey('Extension', blank=False, on_delete=models.PROTECT, verbose_name=_('插件'))
    config = models.JSONField(blank=True, default=dict, verbose_name=_('Runtime Config','运行时配置'))
    name = models.CharField(max_length=128, default='', verbose_name=_('名称'))
    type = models.CharField(max_length=128, default='', verbose_name=_('类型'))

config: JSONField blank django-field #

Runtime Config

created: DateTimeField blank django-field nullable #

创建时间

extension: ForeignKey django-field #

插件

id: UUIDField django-field #

ID

is_active: BooleanField django-field #

是否可用

is_del: BooleanField django-field #

是否删除

name: CharField django-field #

名称

tenant: ForeignKey django-field #

租户

type: CharField django-field #

类型

updated: DateTimeField blank django-field nullable #

更新时间

check_rule(self, event, config) #

抽象方法:校验规则

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_RULES事件

required
config TenantExtensionConfig

运行时配置

required
Source code in arkid/core/extension/auth_rule.py
@abstractmethod
def check_rule(self,event,config):
    """抽象方法:校验规则

    Args:
        event (Event): CREATE_LOGIN_PAGE_RULES事件
        config (TenantExtensionConfig): 运行时配置
    """
    pass

check_rules(self, event, **kwargs) #

响应事件: CREATE_LOGIN_PAGE_RULES,遍历所有运行时配置,校验是否通过规则并决定是否进行下一步操作

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_RULES事件

required
Source code in arkid/core/extension/auth_rule.py
def check_rules(self, event, **kwargs):
    """响应事件: CREATE_LOGIN_PAGE_RULES,遍历所有运行时配置,校验是否通过规则并决定是否进行下一步操作

    Args:
        event (Event): CREATE_LOGIN_PAGE_RULES事件
    """
    for config in self.get_tenant_configs(event.tenant):
        self.check_rule(event, config)

listen_events(self) #

监听事件

Source code in arkid/core/extension/auth_rule.py
def listen_events(self):
    """监听事件
    """
    self.listen_event(CREATE_LOGIN_PAGE_RULES,self.check_rules)

load(self) #

抽象方法,插件加载的入口方法

Source code in arkid/core/extension/auth_rule.py
def load(self):
    super().load()
    self.listen_events()

register_auth_rule_schema(self, schema, auth_rule_type) #

注册认证规则运行时schema

Parameters:

Name Type Description Default
schema Schema

schema描述

required
auth_rule_type str

认证规则类型

required
Source code in arkid/core/extension/auth_rule.py
def register_auth_rule_schema(self, schema, auth_rule_type):
    """注册认证规则运行时schema

    Args:
        schema (Schema): schema描述
        auth_rule_type (str): 认证规则类型
    """
    self.register_config_schema(
        schema, self.package + '_' + auth_rule_type)
    self.register_composite_config_schema(
        schema, auth_rule_type, exclude=['extension'])

Exemplary#

extension_root.com_longgui_auth_rule_retry_times.AuthRuleRetryTimesExtension (AuthRuleExtension) #

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
class AuthRuleRetryTimesExtension(AuthRuleExtension):

    def load(self):
        super().load()
        self.create_extension_config_schema()
        self.listen_event(AUTH_FAIL,self.auth_fail)
        self.listen_event(BEFORE_AUTH,self.before_auth)
        self.listen_event(AUTH_SUCCESS,self.auth_success)

        # 配置初始数据
        tenant = Tenant.platform_tenant()
        if not self.get_tenant_configs(tenant):
            main_auth_factor = TenantExtensionConfig.active_objects.filter(
                tenant=tenant,
                extension=Extension.active_objects.filter(
                    package="com.longgui.auth.factor.password"
                ).first(),
                type="password"
            ).first()

            second_auth_factor = TenantExtensionConfig.active_objects.filter(
                tenant=tenant,
                extension=Extension.active_objects.filter(
                    package="com.longgui.auth.factor.authcode"
                ).first(),
                type="authcode"
            ).first()

            if main_auth_factor and second_auth_factor:
                # 如主认证因素和此认证因素都存在的情况下 创建认证规则

                config = {
                    "main_auth_factor": {
                        "id": main_auth_factor.id.hex, 
                        "name": main_auth_factor.name, 
                        "package": main_auth_factor.extension.package
                    }, 
                    "second_auth_factor": {
                        "id": second_auth_factor.id.hex, 
                        "name": second_auth_factor.name, 
                        "package": second_auth_factor.extension.package
                    }, 
                    "try_times": 3
                }
                self.create_tenant_config(tenant, config, "认证规则:登录失败三次启用图形验证码", "retry_times")

    def before_auth(self,event,**kwargs):
        """ 响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果

        Args:
            event: 事件

        Returns:
            tuple(bool,dict): 次级认证因素校验结果
        """
        for config in self.get_tenant_configs(event.tenant):
            if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
                host = get_remote_addr(event.request)
                if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)):
                    # 判定需要验证
                    responses = dispatch_event(
                        Event(
                            core_event.AUTHRULE_CHECK_AUTH_DATA,
                            tenant=event.tenant,
                            request=event.request,
                            packages=[
                                config.config["second_auth_factor"]["package"]
                            ]
                        )
                    )

                    for useless,(response,useless) in responses:
                        if not response:
                            continue
                        result,data = response
                        if not result:
                            return response
        return True,None

    def auth_success(self,event,**kwargs):
        # 检查是否存在满足条件的配置
        for config in self.get_tenant_configs(event.tenant):
            if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"].id.hex:
                host = get_remote_addr(event.request)
                key = self.gen_key(host,config.id.hex)
                try_times  = 1
                cache.set(event.tenant,key,try_times,expired=config.config.get("expired",30)*60)
                self.clear_refresh_status(event.tenant,host,config.id.hex)

    def auth_fail(self, event, **kwargs):
        """响应事件:认证失败,记录对应IP认证失败次数

        Args:
            event : 事件
        """
        data = event.data["data"]
        # 检查是否存在满足条件的配置
        for config in self.get_tenant_configs(event.tenant):
            if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
                host = get_remote_addr(event.request)
                key = self.gen_key(host,config.id.hex)
                try_times  = int(cache.get(event.tenant,key) or 1)
                cache.set(event.tenant,key,try_times+1,expired=config.config.get("expired",30)*60)
                if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)) and not self.check_refresh_status(event.tenant,host,config.id.hex):
                    data.update(self.error(ErrorCode.AUTH_FAIL_TIMES_OVER_LIMITED))
                    self.set_refresh_status(event.tenant,host,config.id.hex)
                    data["refresh"] = True


    def check_rule(self, event, config):
        login_pages = event.data

        if self.check_retry_times(event.tenant,get_remote_addr(event.request),config.id.hex,config.config.get("try_times",0)): 
            dispatch_event(
                Event(
                    core_event.AUTHRULE_FIX_LOGIN_PAGE,
                    tenant=event.tenant,
                    request=event.request,
                    packages=[
                        config.config["second_auth_factor"]["package"]
                    ],
                    data={
                        "login_pages": login_pages,
                        "main_auth_factor_id": config.config["main_auth_factor"]["id"],
                        "config_id":config.config["second_auth_factor"]["id"]
                    }
                )
            )

    def check_retry_times(self,tenant,host,config_id,limited=3):
        """校验认证失败次数是否超出限制

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID
            limited (int, optional): 认证失败次数限制. Defaults to 3.

        Returns:
            bool: 校验结果
        """
        key=self.gen_key(host,config_id)
        retry_times = int(cache.get(tenant,key) or limited)
        return retry_times > limited

    def set_refresh_status(self,tenant,host,config_id):
        """设置登陆页面刷新tag

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID
        """
        cache.set(tenant,self.gen_refresh_key(host,config_id),1)

    def clear_refresh_status(self,tenant,host,config_id):
        cache.set(tenant,self.gen_refresh_key(host,config_id),0)

    def check_refresh_status(self,tenant,host,config_id):
        """校验是否需要刷新页面

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID

        Returns:
            bool: 是否需要刷新页面
        """
        return bool(cache.get(tenant,self.gen_refresh_key(host,config_id)))

    def gen_refresh_key(self,host:str,config_id:str):
        """页面刷新标识KEY

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID

        Returns:
            str: 页面刷新标识KEY
        """
        return f"{self.package}_cache_auth_refresh_{host}_{config_id}"

    def gen_key(self,host:str,config_id:str):
        """生成记录失败次数的KEY

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID

        Returns:
            str: 记录失败次数的KEY
        """
        return f"{self.package}_cache_auth_retry_times_{host}_{config_id}"

    def create_extension_config_schema(self):
        """创建插件运行时schema
        """
        main_auth_factor_page = pages.TablePage(select=True,name=_("选择主认证因素"))

        self.register_front_pages(main_auth_factor_page)

        main_auth_factor_page.create_actions(
            init_action=actions.DirectAction(
                path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
                method=actions.FrontActionMethod.GET

            )
        )

        second_auth_factor_page = pages.TablePage(select=True,name=_("选择次认证因素"))

        self.register_front_pages(second_auth_factor_page)

        second_auth_factor_page.create_actions(
            init_action=actions.DirectAction(
                path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
                method=actions.FrontActionMethod.GET
            )
        )

        AuthRuleRetryTimesConfigSchema = create_extension_schema(
            'AuthRuleRetryTimesConfigSchema',
            __file__,
            [
                (
                    'try_times', 
                    int, 
                    Field(
                        title=_('try_times', '限制重试次数'),
                        default=3
                    )
                ),
                (
                    'main_auth_factor',
                    MainAuthRuleSchema, 
                    Field(
                        title=_('main_auth_factor', '主认证因素'),
                        page=main_auth_factor_page.tag
                    )
                ),
                (
                    'second_auth_factor',
                    SecondAuthFactorConfigSchema,
                    Field(
                        title=_('second_auth_factor', '次认证因素'),
                        page=second_auth_factor_page.tag
                    )
                ),
                (
                    'expired', 
                    Optional[int],
                    Field(
                        title=_('expired', '有效期/分钟'),
                        default=30,
                    )
                ),
            ],
            base_schema=BaseAuthRuleSchema
        )
        self.register_auth_rule_schema(
            AuthRuleRetryTimesConfigSchema,
            "retry_times"
        )

auth_fail(self, event, **kwargs) #

响应事件:认证失败,记录对应IP认证失败次数

Parameters:

Name Type Description Default
event

事件

required
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def auth_fail(self, event, **kwargs):
    """响应事件:认证失败,记录对应IP认证失败次数

    Args:
        event : 事件
    """
    data = event.data["data"]
    # 检查是否存在满足条件的配置
    for config in self.get_tenant_configs(event.tenant):
        if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
            host = get_remote_addr(event.request)
            key = self.gen_key(host,config.id.hex)
            try_times  = int(cache.get(event.tenant,key) or 1)
            cache.set(event.tenant,key,try_times+1,expired=config.config.get("expired",30)*60)
            if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)) and not self.check_refresh_status(event.tenant,host,config.id.hex):
                data.update(self.error(ErrorCode.AUTH_FAIL_TIMES_OVER_LIMITED))
                self.set_refresh_status(event.tenant,host,config.id.hex)
                data["refresh"] = True

before_auth(self, event, **kwargs) #

响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果

Parameters:

Name Type Description Default
event

事件

required

Returns:

Type Description
tuple(bool,dict)

次级认证因素校验结果

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def before_auth(self,event,**kwargs):
    """ 响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果

    Args:
        event: 事件

    Returns:
        tuple(bool,dict): 次级认证因素校验结果
    """
    for config in self.get_tenant_configs(event.tenant):
        if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
            host = get_remote_addr(event.request)
            if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)):
                # 判定需要验证
                responses = dispatch_event(
                    Event(
                        core_event.AUTHRULE_CHECK_AUTH_DATA,
                        tenant=event.tenant,
                        request=event.request,
                        packages=[
                            config.config["second_auth_factor"]["package"]
                        ]
                    )
                )

                for useless,(response,useless) in responses:
                    if not response:
                        continue
                    result,data = response
                    if not result:
                        return response
    return True,None

check_refresh_status(self, tenant, host, config_id) #

校验是否需要刷新页面

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required

Returns:

Type Description
bool

是否需要刷新页面

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def check_refresh_status(self,tenant,host,config_id):
    """校验是否需要刷新页面

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID

    Returns:
        bool: 是否需要刷新页面
    """
    return bool(cache.get(tenant,self.gen_refresh_key(host,config_id)))

check_retry_times(self, tenant, host, config_id, limited=3) #

校验认证失败次数是否超出限制

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required
limited int

认证失败次数限制. Defaults to 3.

3

Returns:

Type Description
bool

校验结果

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def check_retry_times(self,tenant,host,config_id,limited=3):
    """校验认证失败次数是否超出限制

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID
        limited (int, optional): 认证失败次数限制. Defaults to 3.

    Returns:
        bool: 校验结果
    """
    key=self.gen_key(host,config_id)
    retry_times = int(cache.get(tenant,key) or limited)
    return retry_times > limited

check_rule(self, event, config) #

抽象方法:校验规则

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_RULES事件

required
config TenantExtensionConfig

运行时配置

required
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def check_rule(self, event, config):
    login_pages = event.data

    if self.check_retry_times(event.tenant,get_remote_addr(event.request),config.id.hex,config.config.get("try_times",0)): 
        dispatch_event(
            Event(
                core_event.AUTHRULE_FIX_LOGIN_PAGE,
                tenant=event.tenant,
                request=event.request,
                packages=[
                    config.config["second_auth_factor"]["package"]
                ],
                data={
                    "login_pages": login_pages,
                    "main_auth_factor_id": config.config["main_auth_factor"]["id"],
                    "config_id":config.config["second_auth_factor"]["id"]
                }
            )
        )

create_extension_config_schema(self) #

创建插件运行时schema

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def create_extension_config_schema(self):
    """创建插件运行时schema
    """
    main_auth_factor_page = pages.TablePage(select=True,name=_("选择主认证因素"))

    self.register_front_pages(main_auth_factor_page)

    main_auth_factor_page.create_actions(
        init_action=actions.DirectAction(
            path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
            method=actions.FrontActionMethod.GET

        )
    )

    second_auth_factor_page = pages.TablePage(select=True,name=_("选择次认证因素"))

    self.register_front_pages(second_auth_factor_page)

    second_auth_factor_page.create_actions(
        init_action=actions.DirectAction(
            path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
            method=actions.FrontActionMethod.GET
        )
    )

    AuthRuleRetryTimesConfigSchema = create_extension_schema(
        'AuthRuleRetryTimesConfigSchema',
        __file__,
        [
            (
                'try_times', 
                int, 
                Field(
                    title=_('try_times', '限制重试次数'),
                    default=3
                )
            ),
            (
                'main_auth_factor',
                MainAuthRuleSchema, 
                Field(
                    title=_('main_auth_factor', '主认证因素'),
                    page=main_auth_factor_page.tag
                )
            ),
            (
                'second_auth_factor',
                SecondAuthFactorConfigSchema,
                Field(
                    title=_('second_auth_factor', '次认证因素'),
                    page=second_auth_factor_page.tag
                )
            ),
            (
                'expired', 
                Optional[int],
                Field(
                    title=_('expired', '有效期/分钟'),
                    default=30,
                )
            ),
        ],
        base_schema=BaseAuthRuleSchema
    )
    self.register_auth_rule_schema(
        AuthRuleRetryTimesConfigSchema,
        "retry_times"
    )

gen_key(self, host, config_id) #

生成记录失败次数的KEY

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required

Returns:

Type Description
str

记录失败次数的KEY

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def gen_key(self,host:str,config_id:str):
    """生成记录失败次数的KEY

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID

    Returns:
        str: 记录失败次数的KEY
    """
    return f"{self.package}_cache_auth_retry_times_{host}_{config_id}"

gen_refresh_key(self, host, config_id) #

页面刷新标识KEY

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required

Returns:

Type Description
str

页面刷新标识KEY

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def gen_refresh_key(self,host:str,config_id:str):
    """页面刷新标识KEY

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID

    Returns:
        str: 页面刷新标识KEY
    """
    return f"{self.package}_cache_auth_refresh_{host}_{config_id}"

load(self) #

抽象方法,插件加载的入口方法

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def load(self):
    super().load()
    self.create_extension_config_schema()
    self.listen_event(AUTH_FAIL,self.auth_fail)
    self.listen_event(BEFORE_AUTH,self.before_auth)
    self.listen_event(AUTH_SUCCESS,self.auth_success)

    # 配置初始数据
    tenant = Tenant.platform_tenant()
    if not self.get_tenant_configs(tenant):
        main_auth_factor = TenantExtensionConfig.active_objects.filter(
            tenant=tenant,
            extension=Extension.active_objects.filter(
                package="com.longgui.auth.factor.password"
            ).first(),
            type="password"
        ).first()

        second_auth_factor = TenantExtensionConfig.active_objects.filter(
            tenant=tenant,
            extension=Extension.active_objects.filter(
                package="com.longgui.auth.factor.authcode"
            ).first(),
            type="authcode"
        ).first()

        if main_auth_factor and second_auth_factor:
            # 如主认证因素和此认证因素都存在的情况下 创建认证规则

            config = {
                "main_auth_factor": {
                    "id": main_auth_factor.id.hex, 
                    "name": main_auth_factor.name, 
                    "package": main_auth_factor.extension.package
                }, 
                "second_auth_factor": {
                    "id": second_auth_factor.id.hex, 
                    "name": second_auth_factor.name, 
                    "package": second_auth_factor.extension.package
                }, 
                "try_times": 3
            }
            self.create_tenant_config(tenant, config, "认证规则:登录失败三次启用图形验证码", "retry_times")

set_refresh_status(self, tenant, host, config_id) #

设置登陆页面刷新tag

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def set_refresh_status(self,tenant,host,config_id):
    """设置登陆页面刷新tag

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID
    """
    cache.set(tenant,self.gen_refresh_key(host,config_id),1)

评论