Skip to content

Authentication

Features#

Authentication:Including mobile phone SMS verification code,Username Password,A series of plug -in with graphics verification code, etc.,Used to identify user identity or improve system security。

Implementation#

When developers create a new authentication factors,You need to inherit the AuthFactorextension base class and implement all abstract methods,The data process of the certification factor plug -in during operation is shown in the figure below:

sequenceDiagram
    participant U as Client
    participant C as Platform core
    participant B as Certification factors plugin

    C->>B: Loading plug -in
    B->>C: Registration monitoring custom event:Certification,register,reset Password,Create_LOGIN_PAGE_AUTH_FACTOR
    U->>C: Request to get the login page
    C->>B: Trigger Create_LOGIN_PAGE_AUTH_Factor event
    B->>C: Response event,Traversing all running configuration,Configure Login according to the configuration of running_pages
    C->>U: Rendering login page
    U->>C: Enter user voucher,Click the button,Enter the certification/register/Reset password and other processes
    C->>B: Trigger certification registration/Reset password and other events
    B->>C: Response certification registration/Reset password and other events,Complete the corresponding process,Return result
    C->>U: Return to execution results

Abstract method#

Foundation definition#

arkid.core.extension.auth_factor.AuthFactorExtension (Extension) #

Source code in arkid/core/extension/auth_factor.py
class AuthFactorExtension(Extension):

    TYPE = "auth_factor"


    composite_schema_map = {}
    created_composite_schema_list = []
    composite_key = 'type'
    composite_model = TenantExtensionConfig

    @property
    def type(self):
        return AuthFactorExtension.TYPE

    LOGIN = 'login'
    REGISTER = 'register'
    RESET_PASSWORD = 'password'

    def register_user_key_fields(self, **fields):
        """注册用户模型字段
        """
        User.register_key_field(**fields)

    def load(self):
        super().load()

        self.listen_events()

        self.register_auth_manage_page()

    def register_auth_factor_schema(self, schema, auth_factor_type):
        """注册认证因素运行时配置schema

        Args:
            schema (Schema): schema描述
            auth_factor_type (str): 认证因素类型
        """
        self.register_config_schema(schema, self.package + '_' + auth_factor_type)
        self.register_composite_config_schema(schema, auth_factor_type, exclude=['extension'])

    def start_authenticate(self,event,**kwargs):
        """响应认证事件: 认证前遍历认证规则,如通过所有规则则执行认证规程

        Args:
            event (Event): 认证事件

        Returns:
            Optional[User,None]: 如认证成功则返回user对象,如认证失败跳出事件循环报错
        """
        config = self.get_current_config(event)
        responses = dispatch_event(Event(tag=core_event.BEFORE_AUTH, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config.id.hex}))
        for useless,(response,useless) in responses:
            if not response:
                continue
            result,data = response
            if not result:
                return self.auth_failed(event,data)
        return self.authenticate(event, **kwargs)

    @abstractmethod
    def authenticate(self, event, **kwargs):
        """抽象方法:认证

        Args:
            event (Event): 认证事件
        """
        pass

    def auth_success(self, user, event, **kwargs):
        """封装认证成功返回值,同时触发认证成功事件,如核心判定事件无异常(无其他插件或机制认为结果为失败)则返回认证成功后的用户

        Args:
            user (User): 用户
            event (Event): 认证事件

        Returns:
            Optional[User,None]: 如认证成功则返回user对象,如认证失败跳出事件循环报错
        """
        config = self.get_current_config(event)
        responses = dispatch_event(Event(tag=core_event.AUTH_SUCCESS, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config,"user":user}))
        for useless,(response,useless) in responses:
            if not response:
                continue
            result,data = response
            if not result:
                return self.auth_failed(event,data)
        return user

    def auth_failed(self, event, data, **kwargs):
        """封装认证失败返回值,同时触发认证失败事件,打破事件循环,返回报错信息

        Args:
            event (_type_): 认证事件
            data (_type_): 结果描述
        """
        config = self.get_current_config(event)
        dispatch_event(Event(tag=core_event.AUTH_FAIL, tenant=event.tenant, request=event.request,  data={"auth_factor_config_id":config.id.hex,"data":data}))
        core_event.remove_event_id(event)
        core_event.break_event_loop(data)

    @abstractmethod
    def register(self, event, **kwargs):
        """抽象方法:响应注册事件

        Args:
            event (Event): 注册事件
        """
        pass

    @abstractmethod
    def reset_password(self, event, **kwargs):
        """抽象方法:响应重置密码事件

        Args:
            event (Event): 重置密码事件
        """
        pass

    def create_response(self, event, **kwargs):
        """响应事件:CREATE_LOGIN_PAGE_AUTH_FACTOR事件,组装登陆页面schema描述

        Args:
            event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件

        Returns:
            dict: 组装好的登陆页面元素(非最终结构)
        """
        logger.info(f'{self.package} create_response start')
        self.data = {}
        configs = self.get_tenant_configs(event.tenant)
        for config in configs:

            config_data = {
                self.LOGIN: {
                    'forms':[],
                    'bottoms':[],
                    'extend':{},
                },
                self.REGISTER: {
                    'forms':[],
                    'bottoms':[],
                    'extend':{},
                },
                self.RESET_PASSWORD: {
                    'forms':[],
                    'bottoms':[],
                    'extend':{},
                },
            }

            if config.config.get("login_enabled", True):
                self.create_login_page(event,config,config_data)
            if config.config.get("register_enabled", True):
                self.create_register_page(event, config,config_data)
            if config.config.get("reset_password_enabled", True):
                self.create_password_page(event, config,config_data)
            self.create_other_page(event, config, config_data)
            self.data[config.id.hex] = config_data
        logger.info(self.data)
        logger.info(f'{self.package} create_response end')
        return self.data

    def add_page_form(self, config, page_name, label, items, config_data, submit_url=None, submit_label=None):
        """向config_data中添加表单元素

        Args:
            config (TenantExtensionConfig): 插件运行时配置
            page_name (str): 页面名称
            label (str): 标签
            items (list): 页面元素描述列表
            config_data (dict): 运行时配置数据
            submit_url (str, optional): 表单提交地址. Defaults to None.
            submit_label (str, optional): 表单提交按钮标签. Defaults to None.
        """
        default = {
            "login": ("登录", f"/api/v1/tenant/tenant_id/auth/?event_tag={self.auth_event_tag}"),
            "register": ("注册", f"/api/v1/tenant/tenant_id/register/?event_tag={self.register_event_tag}"),
            "password": ("重置密码", f"/api/v1/tenant/tenant_id/reset_password/?event_tag={self.password_event_tag}"),
        }
        if not submit_label:
            submit_label, useless = default.get(page_name)
        if not submit_url:
            useless, submit_url = default.get(page_name)

        items.append({"type": "hidden", "name": "config_id", "value": config.id})
        config_data[page_name]['forms'].append({
            'label': config.name or label,
            'items': items,
            'submit': {'label': submit_label, 'title':submit_label,'http': {'url': submit_url, 'method': "post"}}
        })

    def add_page_bottoms(self, page_name, bottoms):

        self.data[page_name]['bottoms'].append(bottoms)

    def add_page_extend(self, page_name, buttons, title=None):
        if not self.data[page_name].get('extend'):
            self.data[page_name]['extend'] = {}

        self.data[page_name]['extend']['title'] = title
        self.data[page_name]['extend']['buttons'].append(buttons)

    @abstractmethod
    def create_login_page(self, event, config, config_data):
        """抽象方法:组装登录页面表单

        Args:
            event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
            config (TenantExtensionConfig): 插件运行时配置
            config_data (dict): 运行时配置数据
        """
        pass

    @abstractmethod
    def create_register_page(self, event, config, config_data):
        """抽象方法:组装注册页面表单

        Args:
            event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
            config (TenantExtensionConfig): 插件运行时配置
            config_data (dict): 运行时配置数据
        """
        pass

    @abstractmethod
    def create_password_page(self, event, config, config_data):
        """抽象方法:组装重置密码页面表单

        Args:
            event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
            config (TenantExtensionConfig): 插件运行时配置
            config_data (dict): 运行时配置数据
        """
        pass

    @abstractmethod
    def create_other_page(self, event, config, config_data):
        """抽象方法:组装登录页上其他操作表单

        Args:
            event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
            config (TenantExtensionConfig): 插件运行时配置
            config_data (dict): 运行时配置数据
        """
        pass

    def register_auth_manage_page(self):
        """ 向认证管理页面添加页面
        """
        from api.v1.pages.mine.auth_manage import page as auth_manage_page
        pages = self.create_auth_manage_page()

        if not pages:
            return

        if not isinstance(pages,list):
            pages = [pages]
        for page in pages:
            self.register_front_pages(page)
            auth_manage_page.add_pages(page)

    @abstractmethod
    def create_auth_manage_page(self):
        """抽象方法: 认证管理页面描述
        """
        pass

    @abstractmethod
    def check_auth_data(self, event, **kwargs):
        """ 响应检查认证凭证事件

        Args:
            event: AUTHRULE_CHECK_AUTH_DATA事件
        """
        pass

    @abstractmethod
    def fix_login_page(self, event, **kwargs):
        """向login_pages填入认证元素

        Args:
            event: AUTHRULE_FIX_LOGIN_PAGE事件
        """
        pass

    def get_current_config(self, event):
        """获取事件指向的运行时配置

        Args:
            event (Event): 事件

        Returns:
            TenantExtensionConfig: 运行时配置
        """
        config_id = event.request.POST.get('config_id')
        return self.get_config_by_id(config_id)

    def listen_events(self):
        """注册并监听事件
        """
        self.auth_event_tag = self.register_event('auth', '认证')
        self.listen_event(self.auth_event_tag, self.start_authenticate)
        self.register_event_tag = self.register_event('register', '注册')
        self.listen_event(self.register_event_tag, self.register)
        self.password_event_tag = self.register_event('password', '重置密码')
        self.listen_event(self.password_event_tag, self.reset_password)
        self.listen_event(core_event.CREATE_LOGIN_PAGE_AUTH_FACTOR, self.create_response)
        self.listen_event(core_event.AUTHRULE_CHECK_AUTH_DATA,self.check_auth_data)
        self.listen_event(core_event.AUTHRULE_FIX_LOGIN_PAGE,self.fix_login_page)

composite_model (BaseModel) django-model #

TenantExtensionConfig(id, is_del, is_active, updated, created, tenant, extension, config, name, type)

Source code in arkid/core/extension/auth_factor.py
class TenantExtensionConfig(BaseModel):

    class Meta(object):
        verbose_name = _("插件运行时配置")
        verbose_name_plural = _("插件运行时配置")

    tenant = models.ForeignKey('core.Tenant', blank=False, on_delete=models.PROTECT, verbose_name=_('租户'))
    extension = models.ForeignKey('Extension', blank=False, on_delete=models.PROTECT, verbose_name=_('插件'))
    config = models.JSONField(blank=True, default=dict, verbose_name=_('Runtime Config','运行时配置'))
    name = models.CharField(max_length=128, default='', verbose_name=_('名称'))
    type = models.CharField(max_length=128, default='', verbose_name=_('类型'))

config: JSONField blank django-field #

Runtime Config

created: DateTimeField blank django-field nullable #

创建时间

extension: ForeignKey django-field #

插件

id: UUIDField django-field #

ID

is_active: BooleanField django-field #

是否可用

is_del: BooleanField django-field #

是否删除

name: CharField django-field #

名称

tenant: ForeignKey django-field #

租户

type: CharField django-field #

类型

updated: DateTimeField blank django-field nullable #

更新时间

add_page_form(self, config, page_name, label, items, config_data, submit_url=None, submit_label=None) #

向config_data中添加表单元素

Parameters:

Name Type Description Default
config TenantExtensionConfig

插件运行时配置

required
page_name str

页面名称

required
label str

标签

required
items list

页面元素描述列表

required
config_data dict

运行时配置数据

required
submit_url str

表单提交地址. Defaults to None.

None
submit_label str

表单提交按钮标签. Defaults to None.

None
Source code in arkid/core/extension/auth_factor.py
def add_page_form(self, config, page_name, label, items, config_data, submit_url=None, submit_label=None):
    """向config_data中添加表单元素

    Args:
        config (TenantExtensionConfig): 插件运行时配置
        page_name (str): 页面名称
        label (str): 标签
        items (list): 页面元素描述列表
        config_data (dict): 运行时配置数据
        submit_url (str, optional): 表单提交地址. Defaults to None.
        submit_label (str, optional): 表单提交按钮标签. Defaults to None.
    """
    default = {
        "login": ("登录", f"/api/v1/tenant/tenant_id/auth/?event_tag={self.auth_event_tag}"),
        "register": ("注册", f"/api/v1/tenant/tenant_id/register/?event_tag={self.register_event_tag}"),
        "password": ("重置密码", f"/api/v1/tenant/tenant_id/reset_password/?event_tag={self.password_event_tag}"),
    }
    if not submit_label:
        submit_label, useless = default.get(page_name)
    if not submit_url:
        useless, submit_url = default.get(page_name)

    items.append({"type": "hidden", "name": "config_id", "value": config.id})
    config_data[page_name]['forms'].append({
        'label': config.name or label,
        'items': items,
        'submit': {'label': submit_label, 'title':submit_label,'http': {'url': submit_url, 'method': "post"}}
    })

auth_failed(self, event, data, **kwargs) #

封装认证失败返回值,同时触发认证失败事件,打破事件循环,返回报错信息

Parameters:

Name Type Description Default
event _type_

认证事件

required
data _type_

结果描述

required
Source code in arkid/core/extension/auth_factor.py
def auth_failed(self, event, data, **kwargs):
    """封装认证失败返回值,同时触发认证失败事件,打破事件循环,返回报错信息

    Args:
        event (_type_): 认证事件
        data (_type_): 结果描述
    """
    config = self.get_current_config(event)
    dispatch_event(Event(tag=core_event.AUTH_FAIL, tenant=event.tenant, request=event.request,  data={"auth_factor_config_id":config.id.hex,"data":data}))
    core_event.remove_event_id(event)
    core_event.break_event_loop(data)

auth_success(self, user, event, **kwargs) #

封装认证成功返回值,同时触发认证成功事件,如核心判定事件无异常(无其他插件或机制认为结果为失败)则返回认证成功后的用户

Parameters:

Name Type Description Default
user User

用户

required
event Event

认证事件

required

Returns:

Type Description
Optional[User,None]

如认证成功则返回user对象,如认证失败跳出事件循环报错

Source code in arkid/core/extension/auth_factor.py
def auth_success(self, user, event, **kwargs):
    """封装认证成功返回值,同时触发认证成功事件,如核心判定事件无异常(无其他插件或机制认为结果为失败)则返回认证成功后的用户

    Args:
        user (User): 用户
        event (Event): 认证事件

    Returns:
        Optional[User,None]: 如认证成功则返回user对象,如认证失败跳出事件循环报错
    """
    config = self.get_current_config(event)
    responses = dispatch_event(Event(tag=core_event.AUTH_SUCCESS, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config,"user":user}))
    for useless,(response,useless) in responses:
        if not response:
            continue
        result,data = response
        if not result:
            return self.auth_failed(event,data)
    return user

authenticate(self, event, **kwargs) #

抽象方法:认证

Parameters:

Name Type Description Default
event Event

认证事件

required
Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def authenticate(self, event, **kwargs):
    """抽象方法:认证

    Args:
        event (Event): 认证事件
    """
    pass

check_auth_data(self, event, **kwargs) #

响应检查认证凭证事件

Parameters:

Name Type Description Default
event

AUTHRULE_CHECK_AUTH_DATA事件

required
Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def check_auth_data(self, event, **kwargs):
    """ 响应检查认证凭证事件

    Args:
        event: AUTHRULE_CHECK_AUTH_DATA事件
    """
    pass

create_auth_manage_page(self) #

认证管理页面描述

Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def create_auth_manage_page(self):
    """抽象方法: 认证管理页面描述
    """
    pass

create_login_page(self, event, config, config_data) #

抽象方法:组装登录页面表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def create_login_page(self, event, config, config_data):
    """抽象方法:组装登录页面表单

    Args:
        event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
        config (TenantExtensionConfig): 插件运行时配置
        config_data (dict): 运行时配置数据
    """
    pass

create_other_page(self, event, config, config_data) #

抽象方法:组装登录页上其他操作表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def create_other_page(self, event, config, config_data):
    """抽象方法:组装登录页上其他操作表单

    Args:
        event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
        config (TenantExtensionConfig): 插件运行时配置
        config_data (dict): 运行时配置数据
    """
    pass

create_password_page(self, event, config, config_data) #

抽象方法:组装重置密码页面表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def create_password_page(self, event, config, config_data):
    """抽象方法:组装重置密码页面表单

    Args:
        event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
        config (TenantExtensionConfig): 插件运行时配置
        config_data (dict): 运行时配置数据
    """
    pass

create_register_page(self, event, config, config_data) #

抽象方法:组装注册页面表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def create_register_page(self, event, config, config_data):
    """抽象方法:组装注册页面表单

    Args:
        event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
        config (TenantExtensionConfig): 插件运行时配置
        config_data (dict): 运行时配置数据
    """
    pass

create_response(self, event, **kwargs) #

响应事件:CREATE_LOGIN_PAGE_AUTH_FACTOR事件,组装登陆页面schema描述

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required

Returns:

Type Description
dict

组装好的登陆页面元素(非最终结构)

Source code in arkid/core/extension/auth_factor.py
def create_response(self, event, **kwargs):
    """响应事件:CREATE_LOGIN_PAGE_AUTH_FACTOR事件,组装登陆页面schema描述

    Args:
        event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件

    Returns:
        dict: 组装好的登陆页面元素(非最终结构)
    """
    logger.info(f'{self.package} create_response start')
    self.data = {}
    configs = self.get_tenant_configs(event.tenant)
    for config in configs:

        config_data = {
            self.LOGIN: {
                'forms':[],
                'bottoms':[],
                'extend':{},
            },
            self.REGISTER: {
                'forms':[],
                'bottoms':[],
                'extend':{},
            },
            self.RESET_PASSWORD: {
                'forms':[],
                'bottoms':[],
                'extend':{},
            },
        }

        if config.config.get("login_enabled", True):
            self.create_login_page(event,config,config_data)
        if config.config.get("register_enabled", True):
            self.create_register_page(event, config,config_data)
        if config.config.get("reset_password_enabled", True):
            self.create_password_page(event, config,config_data)
        self.create_other_page(event, config, config_data)
        self.data[config.id.hex] = config_data
    logger.info(self.data)
    logger.info(f'{self.package} create_response end')
    return self.data

fix_login_page(self, event, **kwargs) #

向login_pages填入认证元素

Parameters:

Name Type Description Default
event

AUTHRULE_FIX_LOGIN_PAGE事件

required
Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def fix_login_page(self, event, **kwargs):
    """向login_pages填入认证元素

    Args:
        event: AUTHRULE_FIX_LOGIN_PAGE事件
    """
    pass

get_current_config(self, event) #

获取事件指向的运行时配置

Parameters:

Name Type Description Default
event Event

事件

required

Returns:

Type Description
TenantExtensionConfig

运行时配置

Source code in arkid/core/extension/auth_factor.py
def get_current_config(self, event):
    """获取事件指向的运行时配置

    Args:
        event (Event): 事件

    Returns:
        TenantExtensionConfig: 运行时配置
    """
    config_id = event.request.POST.get('config_id')
    return self.get_config_by_id(config_id)

listen_events(self) #

注册并监听事件

Source code in arkid/core/extension/auth_factor.py
def listen_events(self):
    """注册并监听事件
    """
    self.auth_event_tag = self.register_event('auth', '认证')
    self.listen_event(self.auth_event_tag, self.start_authenticate)
    self.register_event_tag = self.register_event('register', '注册')
    self.listen_event(self.register_event_tag, self.register)
    self.password_event_tag = self.register_event('password', '重置密码')
    self.listen_event(self.password_event_tag, self.reset_password)
    self.listen_event(core_event.CREATE_LOGIN_PAGE_AUTH_FACTOR, self.create_response)
    self.listen_event(core_event.AUTHRULE_CHECK_AUTH_DATA,self.check_auth_data)
    self.listen_event(core_event.AUTHRULE_FIX_LOGIN_PAGE,self.fix_login_page)

load(self) #

抽象方法,插件加载的入口方法

Source code in arkid/core/extension/auth_factor.py
def load(self):
    super().load()

    self.listen_events()

    self.register_auth_manage_page()

register(self, event, **kwargs) #

抽象方法:响应注册事件

Parameters:

Name Type Description Default
event Event

注册事件

required
Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def register(self, event, **kwargs):
    """抽象方法:响应注册事件

    Args:
        event (Event): 注册事件
    """
    pass

register_auth_factor_schema(self, schema, auth_factor_type) #

注册认证因素运行时配置schema

Parameters:

Name Type Description Default
schema Schema

schema描述

required
auth_factor_type str

认证因素类型

required
Source code in arkid/core/extension/auth_factor.py
def register_auth_factor_schema(self, schema, auth_factor_type):
    """注册认证因素运行时配置schema

    Args:
        schema (Schema): schema描述
        auth_factor_type (str): 认证因素类型
    """
    self.register_config_schema(schema, self.package + '_' + auth_factor_type)
    self.register_composite_config_schema(schema, auth_factor_type, exclude=['extension'])

register_auth_manage_page(self) #

向认证管理页面添加页面

Source code in arkid/core/extension/auth_factor.py
def register_auth_manage_page(self):
    """ 向认证管理页面添加页面
    """
    from api.v1.pages.mine.auth_manage import page as auth_manage_page
    pages = self.create_auth_manage_page()

    if not pages:
        return

    if not isinstance(pages,list):
        pages = [pages]
    for page in pages:
        self.register_front_pages(page)
        auth_manage_page.add_pages(page)

register_user_key_fields(self, **fields) #

注册用户模型字段

Source code in arkid/core/extension/auth_factor.py
def register_user_key_fields(self, **fields):
    """注册用户模型字段
    """
    User.register_key_field(**fields)

reset_password(self, event, **kwargs) #

抽象方法:响应重置密码事件

Parameters:

Name Type Description Default
event Event

重置密码事件

required
Source code in arkid/core/extension/auth_factor.py
@abstractmethod
def reset_password(self, event, **kwargs):
    """抽象方法:响应重置密码事件

    Args:
        event (Event): 重置密码事件
    """
    pass

start_authenticate(self, event, **kwargs) #

响应认证事件: 认证前遍历认证规则,如通过所有规则则执行认证规程

Parameters:

Name Type Description Default
event Event

认证事件

required

Returns:

Type Description
Optional[User,None]

如认证成功则返回user对象,如认证失败跳出事件循环报错

Source code in arkid/core/extension/auth_factor.py
def start_authenticate(self,event,**kwargs):
    """响应认证事件: 认证前遍历认证规则,如通过所有规则则执行认证规程

    Args:
        event (Event): 认证事件

    Returns:
        Optional[User,None]: 如认证成功则返回user对象,如认证失败跳出事件循环报错
    """
    config = self.get_current_config(event)
    responses = dispatch_event(Event(tag=core_event.BEFORE_AUTH, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config.id.hex}))
    for useless,(response,useless) in responses:
        if not response:
            continue
        result,data = response
        if not result:
            return self.auth_failed(event,data)
    return self.authenticate(event, **kwargs)

Exemplary#

extension_root.com_longgui_auth_factor_mobile.MobileAuthFactorExtension (AuthFactorExtension) #

手机短信验证码认证因素插件

Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
class MobileAuthFactorExtension(AuthFactorExtension):
    """手机短信验证码认证因素插件
    """
    def load(self):
        """加载插件
        """
        super().load()

        self.create_extension_config_schema()

        self.register_extend_field(UserMobile, "mobile")
        from api.v1.schema.auth import AuthIn
        from api.v1.schema.user import UserCreateIn,UserItemOut,UserUpdateIn,UserListItemOut
        from api.v1.schema.mine import ProfileSchemaOut
        self.register_extend_api(
            AuthIn,
            UserCreateIn, 
            UserItemOut, 
            UserUpdateIn, 
            UserListItemOut,
            mobile=(Optional[str],Field(title=_("电话号码"))),
            # areacode=(str,Field(title=_("区号")))
        )
        self.register_extend_api(
            ProfileSchemaOut, 
            mobile=(Optional[str],Field(readonly=True))
        )

        # 注册发送短信接口
        self.url_send_sms_code = self.register_api(
            '/config/{config_id}/send_sms_code/',
            'POST',
            self.send_sms_code,
            tenant_path=True,
            auth=None,
            response=SendSMSCodeOut,
        )
        print(self.url_send_sms_code)

    def authenticate(self, event, **kwargs):
        """ 认证

        通过手机号码查找用户并校验短信验证码

        Args:
            event (Event): 事件

        """
        tenant = event.tenant
        request = event.request
        data = request.POST or json.load(request.body)

        mobile = data.get('mobile')
        sms_code = data.get('sms_code')

        # user = User.expand_objects.filter(tenant=tenant,mobile=mobile)
        temp_users = tenant.users.all()
        user_ids = []
        for temp_user in temp_users:
            user_ids.append(temp_user.id)
        user = User.expand_objects.filter(
            is_active=True,
            is_del=False,
            id__in=user_ids,
            mobile=mobile
        )
        if len(user) > 1:
            logger.error(f'{mobile}在数据库中匹配到多个用户')
            return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
        if user:
            user = user[0]
            if check_sms_code(tenant, mobile, sms_code):
                user = User.active_objects.get(id=user.get("id"))
                return self.auth_success(user,event)
            else:
                msg = ErrorCode.SMS_CODE_MISMATCH
        else:
            msg = ErrorCode.MOBILE_NOT_EXISTS_ERROR
        return self.auth_failed(event, data=self.error(msg))

    @transaction.atomic()
    def register(self, event, **kwargs):
        """ 注册用户

        Args:
            event (Event): 事件
        """
        tenant = event.tenant
        request = event.request
        data = request.POST or json.load(request.body)

        mobile = data.get('mobile')
        sms_code = data.get('sms_code')
        username = data.get('username')

        config = self.get_current_config(event)
        ret, message = self.check_mobile_exists(mobile, tenant)
        if not ret:
            return self.error(message)

        if not check_sms_code(tenant, mobile, sms_code):
            return self.error(ErrorCode.SMS_CODE_MISMATCH)

        ret, message = self.check_username_exists(username, tenant)
        if not ret:
            return self.error(message)

        user = User(tenant=tenant)

        user.mobile = mobile
        user.username = username

        user.save()
        tenant.users.add(user)
        tenant.save()

        return user

    def reset_password(self, event, **kwargs):
        """ 重置密码

        Args:
            event (Event): 事件
        """
        tenant = event.tenant
        request = event.request
        data = request.POST or json.load(request.body)

        mobile = data.get('mobile')
        sms_code = data.get('sms_code')

        password = data.get('password')
        checkpassword = data.get('checkpassword')

        if password != checkpassword:
            return self.error(ErrorCode.PASSWORD_IS_INCONSISTENT)

        if not check_sms_code(tenant, mobile, sms_code):
            return self.error(ErrorCode.SMS_CODE_MISMATCH)

        # user = User.expand_objects.filter(tenant=tenant,mobile=mobile)
        temp_users = tenant.users.all()
        user_ids = []
        for temp_user in temp_users:
            user_ids.append(temp_user.id)
        user = User.expand_objects.filter(
            is_active=True,
            is_del=False,
            id__in=user_ids,
            mobile=mobile
        )
        if len(user) > 1:
            logger.error(f'{mobile}在数据库中匹配到多个用户')
            return self.error(ErrorCode.CONTACT_MANAGER)
        if user:
            user = user[0]
            user.password = make_password(password)
            user.save()
            return self.success()

        return self.error(ErrorCode.MOBILE_NOT_EXISTS_ERROR)

    def create_login_page(self, event, config, config_data):
        """ 生成手机验证码登录页面Schema描述

        Args:
            event (Event): 事件
            config (TenantExtensionConfig): 插件运行时配置
        """

        items = [
            {
                "type": "text",
                "name":"mobile",
                "placeholder": "手机号码",
                "append": {
                    "title": "发送验证码",
                    "http": {
                        "url": self.url_send_sms_code,
                        "method": "post",
                        "params": {
                            "mobile": "mobile",
                            "areacode": "86",
                        },
                    },
                    "delay": 60
                }
            },
            {
                "type": "text",
                "name":"sms_code",
                "placeholder": "验证码",
            }
        ]
        self.add_page_form(config, self.LOGIN, "手机验证码登录", items, config_data)

    def create_register_page(self, event, config, config_data):
        """生成手机验证码用户注册页面Schema描述

        因本插件提供重置密码功能,此处需用户指定账号用户名

        Args:
            event (Event): 事件
            config (TenantExtensionConfig): 插件运行时配置
        """
        items = [
            {
                "type": "text",
                "name": "username",
                "placeholder": "用户名"
            },
            {
                "type": "text",
                "name":"mobile",
                "placeholder": "手机号码",
                "append": {
                    "title": "发送验证码",
                    "http": {
                        "url": self.url_send_sms_code,
                        "method": "post",
                        "params": {
                            "mobile": "mobile",
                            "areacode": "86",
                        },
                    },
                    "delay": 60
                }
            },
            {
                "type": "text",
                "name":"sms_code",
                "placeholder": "验证码"
            }
        ]
        self.add_page_form(config, self.REGISTER, "手机验证码注册", items, config_data)

    def create_password_page(self, event, config, config_data):
        """生成重置密码页面Schema描述

        通过手机验证码重置密码时需提供手机号码以及对应验证码,同时此处添加新密码确认机制

        注意:重置密码功能需要启用用户名密码认证插件以提供完整支持

        Args:
            event (Event): 事件
            config (TenantExtensionConfig): 插件运行时配置
        """
        items = [
            {
                "type": "text",
                "name":"mobile",
                "placeholder": "手机号码",
                "append": {
                    "title": "发送验证码",
                    "http": {
                        "url": self.url_send_sms_code,
                        "method": "post",
                        "params": {
                            "mobile": "mobile",
                            "areacode": "86",
                        },
                    },
                }
            },
            {
                "type": "text",
                "name":"sms_code",
                "placeholder": "验证码"
            },
            {
                "type": "password",
                "name":"password",
                "placeholder": "密码"
            },
            {
                "type": "password",
                "name":"checkpassword",
                "placeholder": "密码确认"
            }
        ]
        self.add_page_form(config, self.RESET_PASSWORD, "手机验证码重置密码", items, config_data)

    def create_other_page(self, event, config, config_data):
        """创建其他页面(本插件无相关页面)

        Args:
            event (Event): 事件
            config (TenantExtensionConfig): 插件运行时配置
        """
        pass

    def check_mobile_exists(self, mobile, tenant):
        """检查电话号码是否已存在

        Args:
            mobile (str): 手机号
            tenant (Tenant): 租户

        Returns:
            (bool,ErrorCode): mobile是否存在以及对应错误
        """
        if not mobile:
            return False, ErrorCode.MOBILE_EMPTY
        # 需要临时存储
        temp_users = tenant.users.all()
        user_ids = []
        for temp_user in temp_users:
            user_ids.append(temp_user.id)
        if User.expand_objects.filter(
            is_active=True,
            is_del=False,
            id__in=user_ids,
            mobile=mobile
        ).count():
        # if User.expand_objects.filter(tenant=tenant,mobile=mobile).count():
            return False, ErrorCode.MOBILE_EXISTS_ERROR
        return True, None

    def check_username_exists(self,username,tenant):
        """检查用户名是否已存在

        Args:
            username (str): 用户名
            tenant (Tenant): 租户

        Returns:
            (bool,ErrorCode): username是否存在以及对应错误
        """
        # 检查username是否为空
        if not username:
            return False, ErrorCode.USERNAME_EMPTY
        # 检查username是否已存在
        if tenant.users.filter(is_active=True, is_del=False).filter(username=username).count():
        # if User.expand_objects.filter(tenant=tenant,username=username).count():
            return False, ErrorCode.USERNAME_EXISTS_ERROR

        return True, None

    def check_auth_data(self, event, **kwargs):
        pass

    def fix_login_page(self, event, **kwargs):
        pass

    def create_auth_manage_page(self):
        """ 创建“我的-认证管理”中的更换手机号码页面
        """
        _pages = []

        mine_mobile_path = self.register_api(
            "/mine_mobile/",
            "GET",
            self.mine_mobile,
            tenant_path=True,
            auth=GlobalAuth(),
            response=MineMobileOut
        )

        upodate_mine_mobile_path = self.register_api(
            "/mine_mobile/",
            'POST',
            self.update_mine_mobile,
            tenant_path=True,
            auth=GlobalAuth(),
            response=UpdateMineMobileOut
        )

        name = '更改手机号码'

        page = pages.FormPage(name=name)
        page.create_actions(
            init_action=actions.DirectAction(
                path=mine_mobile_path,
                method=actions.FrontActionMethod.GET,
            ),
            global_actions={
                'confirm': actions.ConfirmAction(
                    path=upodate_mine_mobile_path
                ),
            }
        )

        _pages.append(page)
        return _pages

    def create_extension_config_schema(self):
        """创建插件运行时配置schema描述
        """
        select_sms_page = pages.TablePage(select=True,name=_("指定短信插件运行时"))

        self.register_front_pages(select_sms_page)

        select_sms_page.create_actions(
            init_action=actions.DirectAction(
                path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=sms',
                method=actions.FrontActionMethod.GET
            )
        )

        MobileAuthFactorSchema = create_extension_schema(
            'MobileAuthFactorSchema',
            __file__, 
            [
                (
                    'sms_config', 
                    MobileAuthFactorConfigSchema, 
                    Field(
                        title=_('sms extension config', '短信插件运行时'),
                        page=select_sms_page.tag,
                    ),
                ),
                (
                    'code_length', 
                    int, 
                    Field(
                        title=_('code_length', '验证码长度'),
                        default=6
                    )
                ),
                (
                    'expired', 
                    Optional[int],
                    Field(
                        title=_('expired', '有效期/分钟'),
                        default=10,
                    )
                ),
            ],
            BaseAuthFactorSchema,
        )
        self.register_auth_factor_schema(MobileAuthFactorSchema, 'mobile')

    @operation(SendSMSCodeOut)
    def send_sms_code(self,request,tenant_id,config_id:str,data:SendSMSCodeIn):
        """发送短信验证码
        """
        tenant = request.tenant
        mobile = data.mobile
        config = self.get_config_by_id(config_id)
        if not config:
            return self.error(ErrorCode.CONFIG_IS_NOT_EXISTS)

        if not mobile or mobile=="mobile":
            return self.error(ErrorCode.MOBILE_EMPTY)

        code = create_sms_code(tenant,mobile,config.config.get('code_length',6),config.config.get("expired",10)*60)


        responses = dispatch_event(
            Event(
                tag=SEND_SMS,
                tenant=tenant,
                request=request,
                data={
                    "config_id":config.config["sms_config"]["id"],
                    "mobile":data.mobile,
                    "code": code,
                    "areacode": data.areacode,
                    "username": request.user.username if request.user else ""
                },
                packages=config.config["sms_config"]["package"]

            )
        )

        if not responses:
            return self.error(ErrorCode.SMS_EXTENSION_NOT_EXISTS)
        useless, (data, extension) = responses[0]
        if data:
            return self.success()
        else:
            return self.error(ErrorCode.SMS_SEND_FAILED)

    @operation(UpdateMineMobileOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
    def update_mine_mobile(self, request, tenant_id: str,data:UpdateMineMobileIn):
        """ 普通用户:更新手机号码
        """
        mobile = data.mobile
        ret, message = self.check_mobile_exists(mobile, request.tenant)
        if not ret:
            return self.error(message)

        if not check_sms_code(request.tenant,mobile,data.code):
            return self.error(ErrorCode.SMS_CODE_MISMATCH)

        user = request.user
        user.mobile=data.mobile
        user.save()

        return self.success()

    @operation(MineMobileOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
    def mine_mobile(self,request,tenant_id: str):
        user = request.user
        user_expand = User.expand_objects.filter(id=user.id).first()

        config = self.get_tenant_configs(request.tenant).first()

        if not config:
            return self.error(
                ErrorCode.CONFIG_IS_NOT_EXISTS
            )

        return self.success(
            data={
                "current_mobile": user_expand.get("mobile",None),
                "mobile": "",
                "code": "",
                "config_id": config.id.hex,
            },
        )

authenticate(self, event, **kwargs) #

认证

通过手机号码查找用户并校验短信验证码

Parameters:

Name Type Description Default
event Event

事件

required
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def authenticate(self, event, **kwargs):
    """ 认证

    通过手机号码查找用户并校验短信验证码

    Args:
        event (Event): 事件

    """
    tenant = event.tenant
    request = event.request
    data = request.POST or json.load(request.body)

    mobile = data.get('mobile')
    sms_code = data.get('sms_code')

    # user = User.expand_objects.filter(tenant=tenant,mobile=mobile)
    temp_users = tenant.users.all()
    user_ids = []
    for temp_user in temp_users:
        user_ids.append(temp_user.id)
    user = User.expand_objects.filter(
        is_active=True,
        is_del=False,
        id__in=user_ids,
        mobile=mobile
    )
    if len(user) > 1:
        logger.error(f'{mobile}在数据库中匹配到多个用户')
        return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
    if user:
        user = user[0]
        if check_sms_code(tenant, mobile, sms_code):
            user = User.active_objects.get(id=user.get("id"))
            return self.auth_success(user,event)
        else:
            msg = ErrorCode.SMS_CODE_MISMATCH
    else:
        msg = ErrorCode.MOBILE_NOT_EXISTS_ERROR
    return self.auth_failed(event, data=self.error(msg))

check_auth_data(self, event, **kwargs) #

响应检查认证凭证事件

Parameters:

Name Type Description Default
event

AUTHRULE_CHECK_AUTH_DATA事件

required
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def check_auth_data(self, event, **kwargs):
    pass

check_mobile_exists(self, mobile, tenant) #

检查电话号码是否已存在

Parameters:

Name Type Description Default
mobile str

手机号

required
tenant Tenant

租户

required

Returns:

Type Description
(bool,ErrorCode)

mobile是否存在以及对应错误

Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def check_mobile_exists(self, mobile, tenant):
    """检查电话号码是否已存在

    Args:
        mobile (str): 手机号
        tenant (Tenant): 租户

    Returns:
        (bool,ErrorCode): mobile是否存在以及对应错误
    """
    if not mobile:
        return False, ErrorCode.MOBILE_EMPTY
    # 需要临时存储
    temp_users = tenant.users.all()
    user_ids = []
    for temp_user in temp_users:
        user_ids.append(temp_user.id)
    if User.expand_objects.filter(
        is_active=True,
        is_del=False,
        id__in=user_ids,
        mobile=mobile
    ).count():
    # if User.expand_objects.filter(tenant=tenant,mobile=mobile).count():
        return False, ErrorCode.MOBILE_EXISTS_ERROR
    return True, None

check_username_exists(self, username, tenant) #

检查用户名是否已存在

Parameters:

Name Type Description Default
username str

用户名

required
tenant Tenant

租户

required

Returns:

Type Description
(bool,ErrorCode)

username是否存在以及对应错误

Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def check_username_exists(self,username,tenant):
    """检查用户名是否已存在

    Args:
        username (str): 用户名
        tenant (Tenant): 租户

    Returns:
        (bool,ErrorCode): username是否存在以及对应错误
    """
    # 检查username是否为空
    if not username:
        return False, ErrorCode.USERNAME_EMPTY
    # 检查username是否已存在
    if tenant.users.filter(is_active=True, is_del=False).filter(username=username).count():
    # if User.expand_objects.filter(tenant=tenant,username=username).count():
        return False, ErrorCode.USERNAME_EXISTS_ERROR

    return True, None

create_auth_manage_page(self) #

创建“我的-认证管理”中的更换手机号码页面

Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_auth_manage_page(self):
    """ 创建“我的-认证管理”中的更换手机号码页面
    """
    _pages = []

    mine_mobile_path = self.register_api(
        "/mine_mobile/",
        "GET",
        self.mine_mobile,
        tenant_path=True,
        auth=GlobalAuth(),
        response=MineMobileOut
    )

    upodate_mine_mobile_path = self.register_api(
        "/mine_mobile/",
        'POST',
        self.update_mine_mobile,
        tenant_path=True,
        auth=GlobalAuth(),
        response=UpdateMineMobileOut
    )

    name = '更改手机号码'

    page = pages.FormPage(name=name)
    page.create_actions(
        init_action=actions.DirectAction(
            path=mine_mobile_path,
            method=actions.FrontActionMethod.GET,
        ),
        global_actions={
            'confirm': actions.ConfirmAction(
                path=upodate_mine_mobile_path
            ),
        }
    )

    _pages.append(page)
    return _pages

create_extension_config_schema(self) #

创建插件运行时配置schema描述

Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_extension_config_schema(self):
    """创建插件运行时配置schema描述
    """
    select_sms_page = pages.TablePage(select=True,name=_("指定短信插件运行时"))

    self.register_front_pages(select_sms_page)

    select_sms_page.create_actions(
        init_action=actions.DirectAction(
            path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=sms',
            method=actions.FrontActionMethod.GET
        )
    )

    MobileAuthFactorSchema = create_extension_schema(
        'MobileAuthFactorSchema',
        __file__, 
        [
            (
                'sms_config', 
                MobileAuthFactorConfigSchema, 
                Field(
                    title=_('sms extension config', '短信插件运行时'),
                    page=select_sms_page.tag,
                ),
            ),
            (
                'code_length', 
                int, 
                Field(
                    title=_('code_length', '验证码长度'),
                    default=6
                )
            ),
            (
                'expired', 
                Optional[int],
                Field(
                    title=_('expired', '有效期/分钟'),
                    default=10,
                )
            ),
        ],
        BaseAuthFactorSchema,
    )
    self.register_auth_factor_schema(MobileAuthFactorSchema, 'mobile')

create_login_page(self, event, config, config_data) #

生成手机验证码登录页面Schema描述

Parameters:

Name Type Description Default
event Event

事件

required
config TenantExtensionConfig

插件运行时配置

required
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_login_page(self, event, config, config_data):
    """ 生成手机验证码登录页面Schema描述

    Args:
        event (Event): 事件
        config (TenantExtensionConfig): 插件运行时配置
    """

    items = [
        {
            "type": "text",
            "name":"mobile",
            "placeholder": "手机号码",
            "append": {
                "title": "发送验证码",
                "http": {
                    "url": self.url_send_sms_code,
                    "method": "post",
                    "params": {
                        "mobile": "mobile",
                        "areacode": "86",
                    },
                },
                "delay": 60
            }
        },
        {
            "type": "text",
            "name":"sms_code",
            "placeholder": "验证码",
        }
    ]
    self.add_page_form(config, self.LOGIN, "手机验证码登录", items, config_data)

create_other_page(self, event, config, config_data) #

创建其他页面(本插件无相关页面)

Parameters:

Name Type Description Default
event Event

事件

required
config TenantExtensionConfig

插件运行时配置

required
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_other_page(self, event, config, config_data):
    """创建其他页面(本插件无相关页面)

    Args:
        event (Event): 事件
        config (TenantExtensionConfig): 插件运行时配置
    """
    pass

create_password_page(self, event, config, config_data) #

生成重置密码页面Schema描述

通过手机验证码重置密码时需提供手机号码以及对应验证码,同时此处添加新密码确认机制

注意:重置密码功能需要启用用户名密码认证插件以提供完整支持

Parameters:

Name Type Description Default
event Event

事件

required
config TenantExtensionConfig

插件运行时配置

required
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_password_page(self, event, config, config_data):
    """生成重置密码页面Schema描述

    通过手机验证码重置密码时需提供手机号码以及对应验证码,同时此处添加新密码确认机制

    注意:重置密码功能需要启用用户名密码认证插件以提供完整支持

    Args:
        event (Event): 事件
        config (TenantExtensionConfig): 插件运行时配置
    """
    items = [
        {
            "type": "text",
            "name":"mobile",
            "placeholder": "手机号码",
            "append": {
                "title": "发送验证码",
                "http": {
                    "url": self.url_send_sms_code,
                    "method": "post",
                    "params": {
                        "mobile": "mobile",
                        "areacode": "86",
                    },
                },
            }
        },
        {
            "type": "text",
            "name":"sms_code",
            "placeholder": "验证码"
        },
        {
            "type": "password",
            "name":"password",
            "placeholder": "密码"
        },
        {
            "type": "password",
            "name":"checkpassword",
            "placeholder": "密码确认"
        }
    ]
    self.add_page_form(config, self.RESET_PASSWORD, "手机验证码重置密码", items, config_data)

create_register_page(self, event, config, config_data) #

生成手机验证码用户注册页面Schema描述

因本插件提供重置密码功能,此处需用户指定账号用户名

Parameters:

Name Type Description Default
event Event

事件

required
config TenantExtensionConfig

插件运行时配置

required
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_register_page(self, event, config, config_data):
    """生成手机验证码用户注册页面Schema描述

    因本插件提供重置密码功能,此处需用户指定账号用户名

    Args:
        event (Event): 事件
        config (TenantExtensionConfig): 插件运行时配置
    """
    items = [
        {
            "type": "text",
            "name": "username",
            "placeholder": "用户名"
        },
        {
            "type": "text",
            "name":"mobile",
            "placeholder": "手机号码",
            "append": {
                "title": "发送验证码",
                "http": {
                    "url": self.url_send_sms_code,
                    "method": "post",
                    "params": {
                        "mobile": "mobile",
                        "areacode": "86",
                    },
                },
                "delay": 60
            }
        },
        {
            "type": "text",
            "name":"sms_code",
            "placeholder": "验证码"
        }
    ]
    self.add_page_form(config, self.REGISTER, "手机验证码注册", items, config_data)

fix_login_page(self, event, **kwargs) #

向login_pages填入认证元素

Parameters:

Name Type Description Default
event

AUTHRULE_FIX_LOGIN_PAGE事件

required
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def fix_login_page(self, event, **kwargs):
    pass

load(self) #

加载插件

Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def load(self):
    """加载插件
    """
    super().load()

    self.create_extension_config_schema()

    self.register_extend_field(UserMobile, "mobile")
    from api.v1.schema.auth import AuthIn
    from api.v1.schema.user import UserCreateIn,UserItemOut,UserUpdateIn,UserListItemOut
    from api.v1.schema.mine import ProfileSchemaOut
    self.register_extend_api(
        AuthIn,
        UserCreateIn, 
        UserItemOut, 
        UserUpdateIn, 
        UserListItemOut,
        mobile=(Optional[str],Field(title=_("电话号码"))),
        # areacode=(str,Field(title=_("区号")))
    )
    self.register_extend_api(
        ProfileSchemaOut, 
        mobile=(Optional[str],Field(readonly=True))
    )

    # 注册发送短信接口
    self.url_send_sms_code = self.register_api(
        '/config/{config_id}/send_sms_code/',
        'POST',
        self.send_sms_code,
        tenant_path=True,
        auth=None,
        response=SendSMSCodeOut,
    )
    print(self.url_send_sms_code)

register(self, event, **kwargs) #

注册用户

Parameters:

Name Type Description Default
event Event

事件

required
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
@transaction.atomic()
def register(self, event, **kwargs):
    """ 注册用户

    Args:
        event (Event): 事件
    """
    tenant = event.tenant
    request = event.request
    data = request.POST or json.load(request.body)

    mobile = data.get('mobile')
    sms_code = data.get('sms_code')
    username = data.get('username')

    config = self.get_current_config(event)
    ret, message = self.check_mobile_exists(mobile, tenant)
    if not ret:
        return self.error(message)

    if not check_sms_code(tenant, mobile, sms_code):
        return self.error(ErrorCode.SMS_CODE_MISMATCH)

    ret, message = self.check_username_exists(username, tenant)
    if not ret:
        return self.error(message)

    user = User(tenant=tenant)

    user.mobile = mobile
    user.username = username

    user.save()
    tenant.users.add(user)
    tenant.save()

    return user

reset_password(self, event, **kwargs) #

重置密码

Parameters:

Name Type Description Default
event Event

事件

required
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def reset_password(self, event, **kwargs):
    """ 重置密码

    Args:
        event (Event): 事件
    """
    tenant = event.tenant
    request = event.request
    data = request.POST or json.load(request.body)

    mobile = data.get('mobile')
    sms_code = data.get('sms_code')

    password = data.get('password')
    checkpassword = data.get('checkpassword')

    if password != checkpassword:
        return self.error(ErrorCode.PASSWORD_IS_INCONSISTENT)

    if not check_sms_code(tenant, mobile, sms_code):
        return self.error(ErrorCode.SMS_CODE_MISMATCH)

    # user = User.expand_objects.filter(tenant=tenant,mobile=mobile)
    temp_users = tenant.users.all()
    user_ids = []
    for temp_user in temp_users:
        user_ids.append(temp_user.id)
    user = User.expand_objects.filter(
        is_active=True,
        is_del=False,
        id__in=user_ids,
        mobile=mobile
    )
    if len(user) > 1:
        logger.error(f'{mobile}在数据库中匹配到多个用户')
        return self.error(ErrorCode.CONTACT_MANAGER)
    if user:
        user = user[0]
        user.password = make_password(password)
        user.save()
        return self.success()

    return self.error(ErrorCode.MOBILE_NOT_EXISTS_ERROR)

send_sms_code(self, request, tenant_id, config_id, data) #

发送短信验证码

Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
@operation(SendSMSCodeOut)
def send_sms_code(self,request,tenant_id,config_id:str,data:SendSMSCodeIn):
    """发送短信验证码
    """
    tenant = request.tenant
    mobile = data.mobile
    config = self.get_config_by_id(config_id)
    if not config:
        return self.error(ErrorCode.CONFIG_IS_NOT_EXISTS)

    if not mobile or mobile=="mobile":
        return self.error(ErrorCode.MOBILE_EMPTY)

    code = create_sms_code(tenant,mobile,config.config.get('code_length',6),config.config.get("expired",10)*60)


    responses = dispatch_event(
        Event(
            tag=SEND_SMS,
            tenant=tenant,
            request=request,
            data={
                "config_id":config.config["sms_config"]["id"],
                "mobile":data.mobile,
                "code": code,
                "areacode": data.areacode,
                "username": request.user.username if request.user else ""
            },
            packages=config.config["sms_config"]["package"]

        )
    )

    if not responses:
        return self.error(ErrorCode.SMS_EXTENSION_NOT_EXISTS)
    useless, (data, extension) = responses[0]
    if data:
        return self.success()
    else:
        return self.error(ErrorCode.SMS_SEND_FAILED)

update_mine_mobile(self, request, tenant_id, data) #

普通用户:更新手机号码

Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
@operation(UpdateMineMobileOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
def update_mine_mobile(self, request, tenant_id: str,data:UpdateMineMobileIn):
    """ 普通用户:更新手机号码
    """
    mobile = data.mobile
    ret, message = self.check_mobile_exists(mobile, request.tenant)
    if not ret:
        return self.error(message)

    if not check_sms_code(request.tenant,mobile,data.code):
        return self.error(ErrorCode.SMS_CODE_MISMATCH)

    user = request.user
    user.mobile=data.mobile
    user.save()

    return self.success()

extension_root.com_longgui_auth_factor_password.PasswordAuthFactorExtension (AuthFactorExtension) #

Source code in extension_root/com_longgui_auth_factor_password/__init__.py
class PasswordAuthFactorExtension(AuthFactorExtension):
    def load(self):
        super().load()
        self.register_extend_field(UserPassword, "password")
        self.register_auth_factor_schema(PasswordAuthFactorSchema, 'password')
        self.register_extend_api(AuthIn, password=str)
        user_key_fields_path = self.register_api(
            '/user_key_fields/',
            'GET',
            self.get_user_key_fields,
            response=List[GetUserKeyFieldItemOut],
        )
        select_pw_login_fields_page.create_actions(
            init_action=actions.DirectAction(
                path=user_key_fields_path,
                method=actions.FrontActionMethod.GET,
            ),
        )

        select_pw_register_login_fields_page.create_actions(
            init_action=actions.DirectAction(
                path=user_key_fields_path,
                method=actions.FrontActionMethod.GET,
            ),
        )

        self.register_front_pages(select_pw_login_fields_page)
        self.register_front_pages(select_pw_register_login_fields_page)

        # 租户管理员:用户管理-用户列表-重置密码
        reset_user_password_path = self.register_api(
            '/reset_user_password/{id}/',
            'POST',
            self.reset_user_password,
            tenant_path=True,
            response=ResponseSchema,
            auth=GlobalAuth()
        )

        user_list_page.add_local_actions(
            actions.OpenAction(
                name='重置密码',
                path=reset_user_password_path,
                method=actions.FrontActionMethod.POST,
            )
        )

        # 初始化部分配置数据
        tenant = Tenant.platform_tenant()
        if not self.get_tenant_configs(tenant):
            config = {
                'login_enabled_field_names': [{'key':'username'}],
                'register_enabled_field_names': [{'key':'username'}],
                'is_apply': False,
                'regular': '',
                'title': '',
            }
            self.create_tenant_config(tenant, config, "账密登录", "password")
        try:
            admin_user = User.active_objects.filter(username='admin').first()
            if admin_user:
                admin_password = UserPassword.active_objects.filter(target=admin_user)
                if not admin_password:
                    admin_user.password = make_password('admin')
                    admin_user.save()
        except Exception as e:
            print(e)

        self.listen_event(
            CREATE_TENANT,
            self.create_tenant_event
        )

    def create_tenant_event(self,event,**kwargs):
        tenant = event.tenant
        config = {
            'login_enabled_field_names': [{'key':'username'}],
            'register_enabled_field_names': [{'key':'username'}],
            'is_apply': False,
            'regular': '',
            'title': '',
        }
        self.create_tenant_config(tenant, config, "default", "password")

    def check_auth_data(self, event, **kwargs):
        pass

    def fix_login_page(self, event, **kwargs):
        pass

    @operation(roles=[TENANT_ADMIN, PLATFORM_ADMIN])
    def reset_user_password(self, request, tenant_id:str, id:str, data:RestUserPasswordIn):
        user = User.active_objects.get(id=id)
        password = data.password
        user.password = make_password(password)
        user.save()
        return self.success()

    def get_user_key_fields(self,request):
        data = [{'key':key,'name':value} for key,value in User.key_fields.items()]
        return data

    def authenticate(self, event, **kwargs):
        tenant = event.tenant
        request = event.request

        data = request.POST or json.load(request.body)

        username = data.get('username')
        password = data.get('password')
        config_id = data.get('config_id')


        config = TenantExtensionConfig.active_objects.get(id=config_id).config
        login_enabled_field_names = [item["key"] if isinstance(item,dict) else item for item in config.get('login_enabled_field_names')]
        filter_params = None

        login_enabled_field_names = login_enabled_field_names or ["username"]

        for lefn in login_enabled_field_names:
            temp = {lefn:username}
            if filter_params:
                filter_params = Q(**temp) | filter_params
            else:
                filter_params = Q(**temp)

        users = tenant.users.filter(is_del=False).filter(filter_params)
        if len(users) > 1:
            logger.error(f'{username}{login_enabled_field_names}中匹配到多个用户')
            return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
        user = users[0] if users else None
        if user:
            # 对象转换
            user = User.expand_objects.filter(id=user.id).first()
            user_password = user.get("password")
            if user_password:
                if check_password(password, user_password):
                    user = User.valid_objects.get(id=user.get("id"))
                    return self.auth_success(user, event)

        return self.auth_failed(event, data=self.error(ErrorCode.USERNAME_PASSWORD_MISMATCH))

    @transaction.atomic()
    def register(self, event, **kwargs):
        tenant = event.tenant
        request = event.request
        data = request.POST or json.load(request.body)

        username = data.get('username')
        password = data.get('password')

        if data.get('checkpassword',None) != password:
            return self.error(ErrorCode.TWO_TIME_PASSWORD_MISMATCH)

        config = self.get_current_config(event)
        ret, message = self.check_password_complexity(password, config)
        if not ret:
            return self.error(ErrorCode.PASSWORD_STRENGTH_LACK)

        register_fields = [item["key"] if isinstance(item,dict) else item for item in config.config.get('register_enabled_field_names')]
        if not register_fields:
            fields = ['username']
            if username is None:
                self.auth_failed(event, data=self.error(ErrorCode.USERNAME_EMPTY))
        else:
            fields = [k for k in register_fields if request.POST.get(k) is not None]
            if not fields:
                self.auth_failed(event, data=self.error(ErrorCode.ALL_USER_FLAG_LACK_FIELD))

        for field in fields:
            user = self._get_register_user(tenant, field, request.POST.get(field))
            if user:
                self.auth_failed(event, data=self.error(ErrorCode.FIELD_USER_EXISTS, field=field))

        # user = User.objects.create(tenant=tenant)
        user = User(tenant=tenant)
        for k in fields:
            if request.POST.get(k):
                setattr(user, k, request.POST.get(k))
        user.password = make_password(password)
        user.save()
        tenant.users.add(user)
        tenant.save()

        return user

    def reset_password(self, event, **kwargs):
        pass

    def create_login_page(self, event, config, config_data):
        username_placeholder = ""
        for lefn in [item["key"] if isinstance(item,dict) else item for item in config.config.get('login_enabled_field_names',[])]:
            if username_placeholder:
                username_placeholder = ',' + User.key_fields[lefn]
            else:
                username_placeholder = User.key_fields[lefn]
        items = [
            {
                "type": "text",
                "name": "username",
                "placeholder": username_placeholder or '用户名'
            },
            {
                "type": "password",
                "name": "password",
                "placeholder": "密码"
            },
        ]
        self.add_page_form(config, self.LOGIN, "用户名密码登录", items, config_data)

    def create_register_page(self, event, config, config_data):
        items = []
        register_fields = [item["key"] if isinstance(item,dict) else item for item in config.config.get('register_enabled_field_names')]
        for rf in register_fields:
            items.append({
                "type": "text",
                "name": rf,
                "placeholder": User.key_fields[rf]
            })
        items.extend([
            {
                "type": "password",
                "name": "password",
                "placeholder": "密码"
            },
            {
                "type": "password",
                "name": "checkpassword",
                "placeholder": "密码确认"
            },
        ])
        self.add_page_form(config, self.REGISTER, "用户名密码注册", items, config_data)

    def create_password_page(self, event, config, config_data):
        pass

    def create_other_page(self, event, config, config_data):
        pass

    def check_password_complexity(self, pwd, config):
        if not pwd:
            return False, 'No password provide'

        if config:
            regular = config.config.get('regular')
            title = config.config.get('title')
            if re.match(regular, pwd):
                return True, None
            else:
                return False, title
        return True, None

    def _get_register_user(self, tenant, field_name, field_value):
        user = None
        if field_name in ('username', 'email'):
            user = tenant.users.filter(is_active=True, is_del=False).filter(**{field_name: field_value}).first()
        else:
            # 获取刚注册的用户
            user = User.expand_objects.filter(**{field_name: field_value}).first()
        return user

    def create_auth_manage_page(self):
        # 更改密码页面

        mine_password_path = self.register_api(
            "/mine_password/",
            'POST',
            self.update_mine_password,
            tenant_path=True,
            response=UpdateMinePasswordOut,
        )

        name = '更改密码'

        page = pages.FormPage(name=name)
        page.create_actions(
            init_action=actions.ConfirmAction(
                path=mine_password_path,
            ),
            global_actions={
                'confirm': actions.ConfirmAction(
                    path=mine_password_path
                ),
            }
        )
        return page

    @operation(UpdateMinePasswordOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
    def update_mine_password(self,request, tenant_id: str,data:UpdateMinePasswordIn):
        """更改密码"""
        user = request.user

        user_expand = User.expand_objects.get(id=user.id)

        user_password = user_expand["password"]
        if not user_password or check_password(data.old_password, user_password):
            if data.password == data.confirm_password:
                user.password = make_password(data.password)
                user.save()
                return self.success()
            else:
                return self.error(ErrorCode.TWO_TIME_PASSWORD_MISMATCH)

        return self.error(ErrorCode.OLD_PASSWORD_ERROR)

authenticate(self, event, **kwargs) #

抽象方法:认证

Parameters:

Name Type Description Default
event Event

认证事件

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def authenticate(self, event, **kwargs):
    tenant = event.tenant
    request = event.request

    data = request.POST or json.load(request.body)

    username = data.get('username')
    password = data.get('password')
    config_id = data.get('config_id')


    config = TenantExtensionConfig.active_objects.get(id=config_id).config
    login_enabled_field_names = [item["key"] if isinstance(item,dict) else item for item in config.get('login_enabled_field_names')]
    filter_params = None

    login_enabled_field_names = login_enabled_field_names or ["username"]

    for lefn in login_enabled_field_names:
        temp = {lefn:username}
        if filter_params:
            filter_params = Q(**temp) | filter_params
        else:
            filter_params = Q(**temp)

    users = tenant.users.filter(is_del=False).filter(filter_params)
    if len(users) > 1:
        logger.error(f'{username}{login_enabled_field_names}中匹配到多个用户')
        return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
    user = users[0] if users else None
    if user:
        # 对象转换
        user = User.expand_objects.filter(id=user.id).first()
        user_password = user.get("password")
        if user_password:
            if check_password(password, user_password):
                user = User.valid_objects.get(id=user.get("id"))
                return self.auth_success(user, event)

    return self.auth_failed(event, data=self.error(ErrorCode.USERNAME_PASSWORD_MISMATCH))

check_auth_data(self, event, **kwargs) #

响应检查认证凭证事件

Parameters:

Name Type Description Default
event

AUTHRULE_CHECK_AUTH_DATA事件

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def check_auth_data(self, event, **kwargs):
    pass

create_auth_manage_page(self) #

认证管理页面描述

Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_auth_manage_page(self):
    # 更改密码页面

    mine_password_path = self.register_api(
        "/mine_password/",
        'POST',
        self.update_mine_password,
        tenant_path=True,
        response=UpdateMinePasswordOut,
    )

    name = '更改密码'

    page = pages.FormPage(name=name)
    page.create_actions(
        init_action=actions.ConfirmAction(
            path=mine_password_path,
        ),
        global_actions={
            'confirm': actions.ConfirmAction(
                path=mine_password_path
            ),
        }
    )
    return page

create_login_page(self, event, config, config_data) #

抽象方法:组装登录页面表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_login_page(self, event, config, config_data):
    username_placeholder = ""
    for lefn in [item["key"] if isinstance(item,dict) else item for item in config.config.get('login_enabled_field_names',[])]:
        if username_placeholder:
            username_placeholder = ',' + User.key_fields[lefn]
        else:
            username_placeholder = User.key_fields[lefn]
    items = [
        {
            "type": "text",
            "name": "username",
            "placeholder": username_placeholder or '用户名'
        },
        {
            "type": "password",
            "name": "password",
            "placeholder": "密码"
        },
    ]
    self.add_page_form(config, self.LOGIN, "用户名密码登录", items, config_data)

create_other_page(self, event, config, config_data) #

抽象方法:组装登录页上其他操作表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_other_page(self, event, config, config_data):
    pass

create_password_page(self, event, config, config_data) #

抽象方法:组装重置密码页面表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_password_page(self, event, config, config_data):
    pass

create_register_page(self, event, config, config_data) #

抽象方法:组装注册页面表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_register_page(self, event, config, config_data):
    items = []
    register_fields = [item["key"] if isinstance(item,dict) else item for item in config.config.get('register_enabled_field_names')]
    for rf in register_fields:
        items.append({
            "type": "text",
            "name": rf,
            "placeholder": User.key_fields[rf]
        })
    items.extend([
        {
            "type": "password",
            "name": "password",
            "placeholder": "密码"
        },
        {
            "type": "password",
            "name": "checkpassword",
            "placeholder": "密码确认"
        },
    ])
    self.add_page_form(config, self.REGISTER, "用户名密码注册", items, config_data)

fix_login_page(self, event, **kwargs) #

向login_pages填入认证元素

Parameters:

Name Type Description Default
event

AUTHRULE_FIX_LOGIN_PAGE事件

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def fix_login_page(self, event, **kwargs):
    pass

load(self) #

抽象方法,插件加载的入口方法

Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def load(self):
    super().load()
    self.register_extend_field(UserPassword, "password")
    self.register_auth_factor_schema(PasswordAuthFactorSchema, 'password')
    self.register_extend_api(AuthIn, password=str)
    user_key_fields_path = self.register_api(
        '/user_key_fields/',
        'GET',
        self.get_user_key_fields,
        response=List[GetUserKeyFieldItemOut],
    )
    select_pw_login_fields_page.create_actions(
        init_action=actions.DirectAction(
            path=user_key_fields_path,
            method=actions.FrontActionMethod.GET,
        ),
    )

    select_pw_register_login_fields_page.create_actions(
        init_action=actions.DirectAction(
            path=user_key_fields_path,
            method=actions.FrontActionMethod.GET,
        ),
    )

    self.register_front_pages(select_pw_login_fields_page)
    self.register_front_pages(select_pw_register_login_fields_page)

    # 租户管理员:用户管理-用户列表-重置密码
    reset_user_password_path = self.register_api(
        '/reset_user_password/{id}/',
        'POST',
        self.reset_user_password,
        tenant_path=True,
        response=ResponseSchema,
        auth=GlobalAuth()
    )

    user_list_page.add_local_actions(
        actions.OpenAction(
            name='重置密码',
            path=reset_user_password_path,
            method=actions.FrontActionMethod.POST,
        )
    )

    # 初始化部分配置数据
    tenant = Tenant.platform_tenant()
    if not self.get_tenant_configs(tenant):
        config = {
            'login_enabled_field_names': [{'key':'username'}],
            'register_enabled_field_names': [{'key':'username'}],
            'is_apply': False,
            'regular': '',
            'title': '',
        }
        self.create_tenant_config(tenant, config, "账密登录", "password")
    try:
        admin_user = User.active_objects.filter(username='admin').first()
        if admin_user:
            admin_password = UserPassword.active_objects.filter(target=admin_user)
            if not admin_password:
                admin_user.password = make_password('admin')
                admin_user.save()
    except Exception as e:
        print(e)

    self.listen_event(
        CREATE_TENANT,
        self.create_tenant_event
    )

reset_password(self, event, **kwargs) #

抽象方法:响应重置密码事件

Parameters:

Name Type Description Default
event Event

重置密码事件

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def reset_password(self, event, **kwargs):
    pass

update_mine_password(self, request, tenant_id, data) #

更改密码

Source code in extension_root/com_longgui_auth_factor_password/__init__.py
@operation(UpdateMinePasswordOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
def update_mine_password(self,request, tenant_id: str,data:UpdateMinePasswordIn):
    """更改密码"""
    user = request.user

    user_expand = User.expand_objects.get(id=user.id)

    user_password = user_expand["password"]
    if not user_password or check_password(data.old_password, user_password):
        if data.password == data.confirm_password:
            user.password = make_password(data.password)
            user.save()
            return self.success()
        else:
            return self.error(ErrorCode.TWO_TIME_PASSWORD_MISMATCH)

    return self.error(ErrorCode.OLD_PASSWORD_ERROR)

评论