Skip to content

密码认证因素#

功能介绍#

对用户表扩展密码字段,允许用户通过用户名与密码的方式进行认证,注册。

普通用户:

  • 在 “我的 - 认证管理“ 中添加重置密码的功能
  • 在 “注册” 页面实现用户名密码注册
  • 在 “登录” 页面实现用户名密码登录

租户管理员

  • 在”用户管理 - 用户列表“中添加重置密码的功能

配置指南#

配置指南#

经由左侧菜单栏依次进入【租户管理】->【插件管理】,在插件租赁页面中找到密码认证因素插件卡片,点击租赁
vEoE7j.png

经由左侧菜单栏依次进入【认证管理】-> 【认证因素】,点击创建按钮,类型选择"password",填入相关信息,至此配置完成
vEoU9x.md.png

vEoWgf.md.png

vEoXvT.png

由用户头像菜单进入【认证管理】界面,选择更改密码标签页
vEo6UA.md.png

实现思路#

普通用户:注册/登录:

sequenceDiagram
    participant D as 用户
    participant C as 平台核心
    participant A as 密码认证因素插件

    C->>A: 加载插件
    A->>C: 注册并监听密码认证相关事件(注册/登录等)
    D->>C: 访问注册/登录页面
    C->>A: 发出CREATE_LOGIN_PAGE_AUTH_FACTOR事件
    A->>C: 响应事件,组装注册/登录页面元素
    C->>D: 渲染注册/登录/重置密码页面
    D->>C: 输入相关信息,点击【注册/登录】按钮
    C->>A: 发出注册/登录事件
    A->>C: 响应事件,完成注册/登录流程,返回结果
    C->>D: 检查结果,如完成注册/登录相关操作则生成token并跳转至桌面,如未完成注册/登录操作则提示错误

普通用户:重置密码:

sequenceDiagram
    participant D as 用户
    participant C as 平台核心
    participant A as 密码认证因素插件

    C->>A: 加载插件
    A->>C: 向“我的 - 认证管理“ 页面中添加重置密码元素,向核心注册重置密码接口
    D->>C: 访问“我的 - 认证管理“ 页面中重置密码功能,录入新的密码
    C->>A: 访问重置密码接口
    A->>C: 响应接口,检查输入参数,返回结果
    C->>D: 检查结果,并提示是否完成更改

管理员用户: 重置用户密码

sequenceDiagram
    participant D as 用户
    participant C as 平台核心
    participant A as 密码认证因素插件

    C->>A: 加载插件
    A->>C: 向“用户列表-编辑用户”页面注入密码元素,向核心用户模型注入密码字段
    D->>C: 管理员登录,访问用户列表页面,编辑用户密码,点击保存
    C->>D: 修改密码字段值并保存至数据库

抽象方法实现#

代码#

extension_root.com_longgui_auth_factor_password.PasswordAuthFactorExtension (AuthFactorExtension) #

Source code in extension_root/com_longgui_auth_factor_password/__init__.py
class PasswordAuthFactorExtension(AuthFactorExtension):
    def load(self):
        super().load()
        self.register_extend_field(UserPassword, "password")
        self.register_auth_factor_schema(PasswordAuthFactorSchema, 'password')
        self.register_extend_api(AuthIn, password=str)
        user_key_fields_path = self.register_api(
            '/user_key_fields/',
            'GET',
            self.get_user_key_fields,
            response=List[GetUserKeyFieldItemOut],
        )
        select_pw_login_fields_page.create_actions(
            init_action=actions.DirectAction(
                path=user_key_fields_path,
                method=actions.FrontActionMethod.GET,
            ),
        )

        select_pw_register_login_fields_page.create_actions(
            init_action=actions.DirectAction(
                path=user_key_fields_path,
                method=actions.FrontActionMethod.GET,
            ),
        )

        self.register_front_pages(select_pw_login_fields_page)
        self.register_front_pages(select_pw_register_login_fields_page)

        # 租户管理员:用户管理-用户列表-重置密码
        reset_user_password_path = self.register_api(
            '/reset_user_password/{id}/',
            'POST',
            self.reset_user_password,
            tenant_path=True,
            response=ResponseSchema,
            auth=GlobalAuth()
        )

        user_list_page.add_local_actions(
            actions.OpenAction(
                name='重置密码',
                path=reset_user_password_path,
                method=actions.FrontActionMethod.POST,
            )
        )

        # 初始化部分配置数据
        tenant = Tenant.platform_tenant()
        if not self.get_tenant_configs(tenant):
            config = {
                'login_enabled_field_names': [{'key':'username'}],
                'register_enabled_field_names': [{'key':'username'}],
                'is_apply': False,
                'regular': '',
                'title': '',
            }
            self.create_tenant_config(tenant, config, "账密登录", "password")
        try:
            admin_user = User.active_objects.filter(username='admin').first()
            if admin_user:
                admin_password = UserPassword.active_objects.filter(target=admin_user)
                if not admin_password:
                    admin_user.password = make_password('admin')
                    admin_user.save()
        except Exception as e:
            print(e)

        self.listen_event(
            CREATE_TENANT,
            self.create_tenant_event
        )

    def create_tenant_event(self,event,**kwargs):
        tenant = event.tenant
        config = {
            'login_enabled_field_names': [{'key':'username'}],
            'register_enabled_field_names': [{'key':'username'}],
            'is_apply': False,
            'regular': '',
            'title': '',
        }
        self.create_tenant_config(tenant, config, "default", "password")

    def check_auth_data(self, event, **kwargs):
        pass

    def fix_login_page(self, event, **kwargs):
        pass

    @operation(roles=[TENANT_ADMIN, PLATFORM_ADMIN])
    def reset_user_password(self, request, tenant_id:str, id:str, data:RestUserPasswordIn):
        user = User.active_objects.get(id=id)
        password = data.password
        user.password = make_password(password)
        user.save()
        return self.success()

    def get_user_key_fields(self,request):
        data = [{'key':key,'name':value} for key,value in User.key_fields.items()]
        return data

    def authenticate(self, event, **kwargs):
        tenant = event.tenant
        request = event.request

        data = request.POST or json.load(request.body)

        username = data.get('username')
        password = data.get('password')
        config_id = data.get('config_id')


        config = TenantExtensionConfig.active_objects.get(id=config_id).config
        login_enabled_field_names = [item["key"] if isinstance(item,dict) else item for item in config.get('login_enabled_field_names')]
        filter_params = None

        login_enabled_field_names = login_enabled_field_names or ["username"]

        for lefn in login_enabled_field_names:
            temp = {lefn:username}
            if filter_params:
                filter_params = Q(**temp) | filter_params
            else:
                filter_params = Q(**temp)

        users = tenant.users.filter(is_del=False).filter(filter_params)
        if len(users) > 1:
            logger.error(f'{username}{login_enabled_field_names}中匹配到多个用户')
            return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
        user = users[0] if users else None
        if user:
            # 对象转换
            user = User.expand_objects.filter(id=user.id).first()
            user_password = user.get("password")
            if user_password:
                if check_password(password, user_password):
                    user = User.valid_objects.get(id=user.get("id"))
                    return self.auth_success(user, event)

        return self.auth_failed(event, data=self.error(ErrorCode.USERNAME_PASSWORD_MISMATCH))

    @transaction.atomic()
    def register(self, event, **kwargs):
        tenant = event.tenant
        request = event.request
        data = request.POST or json.load(request.body)

        username = data.get('username')
        password = data.get('password')

        if data.get('checkpassword',None) != password:
            return self.error(ErrorCode.TWO_TIME_PASSWORD_MISMATCH)

        config = self.get_current_config(event)
        ret, message = self.check_password_complexity(password, config)
        if not ret:
            return self.error(ErrorCode.PASSWORD_STRENGTH_LACK)

        register_fields = [item["key"] if isinstance(item,dict) else item for item in config.config.get('register_enabled_field_names')]
        if not register_fields:
            fields = ['username']
            if username is None:
                self.auth_failed(event, data=self.error(ErrorCode.USERNAME_EMPTY))
        else:
            fields = [k for k in register_fields if request.POST.get(k) is not None]
            if not fields:
                self.auth_failed(event, data=self.error(ErrorCode.ALL_USER_FLAG_LACK_FIELD))

        for field in fields:
            user = self._get_register_user(tenant, field, request.POST.get(field))
            if user:
                self.auth_failed(event, data=self.error(ErrorCode.FIELD_USER_EXISTS, field=field))

        # user = User.objects.create(tenant=tenant)
        user = User(tenant=tenant)
        for k in fields:
            if request.POST.get(k):
                setattr(user, k, request.POST.get(k))
        user.password = make_password(password)
        user.save()
        tenant.users.add(user)
        tenant.save()

        return user

    def reset_password(self, event, **kwargs):
        pass

    def create_login_page(self, event, config, config_data):
        username_placeholder = ""
        for lefn in [item["key"] if isinstance(item,dict) else item for item in config.config.get('login_enabled_field_names',[])]:
            if username_placeholder:
                username_placeholder = ',' + User.key_fields[lefn]
            else:
                username_placeholder = User.key_fields[lefn]
        items = [
            {
                "type": "text",
                "name": "username",
                "placeholder": username_placeholder or '用户名'
            },
            {
                "type": "password",
                "name": "password",
                "placeholder": "密码"
            },
        ]
        self.add_page_form(config, self.LOGIN, "用户名密码登录", items, config_data)

    def create_register_page(self, event, config, config_data):
        items = []
        register_fields = [item["key"] if isinstance(item,dict) else item for item in config.config.get('register_enabled_field_names')]
        for rf in register_fields:
            items.append({
                "type": "text",
                "name": rf,
                "placeholder": User.key_fields[rf]
            })
        items.extend([
            {
                "type": "password",
                "name": "password",
                "placeholder": "密码"
            },
            {
                "type": "password",
                "name": "checkpassword",
                "placeholder": "密码确认"
            },
        ])
        self.add_page_form(config, self.REGISTER, "用户名密码注册", items, config_data)

    def create_password_page(self, event, config, config_data):
        pass

    def create_other_page(self, event, config, config_data):
        pass

    def check_password_complexity(self, pwd, config):
        if not pwd:
            return False, 'No password provide'

        if config:
            regular = config.config.get('regular')
            title = config.config.get('title')
            if re.match(regular, pwd):
                return True, None
            else:
                return False, title
        return True, None

    def _get_register_user(self, tenant, field_name, field_value):
        user = None
        if field_name in ('username', 'email'):
            user = tenant.users.filter(is_active=True, is_del=False).filter(**{field_name: field_value}).first()
        else:
            # 获取刚注册的用户
            user = User.expand_objects.filter(**{field_name: field_value}).first()
        return user

    def create_auth_manage_page(self):
        # 更改密码页面

        mine_password_path = self.register_api(
            "/mine_password/",
            'POST',
            self.update_mine_password,
            tenant_path=True,
            response=UpdateMinePasswordOut,
        )

        name = '更改密码'

        page = pages.FormPage(name=name)
        page.create_actions(
            init_action=actions.ConfirmAction(
                path=mine_password_path,
            ),
            global_actions={
                'confirm': actions.ConfirmAction(
                    path=mine_password_path
                ),
            }
        )
        return page

    @operation(UpdateMinePasswordOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
    def update_mine_password(self,request, tenant_id: str,data:UpdateMinePasswordIn):
        """更改密码"""
        user = request.user

        user_expand = User.expand_objects.get(id=user.id)

        user_password = user_expand["password"]
        if not user_password or check_password(data.old_password, user_password):
            if data.password == data.confirm_password:
                user.password = make_password(data.password)
                user.save()
                return self.success()
            else:
                return self.error(ErrorCode.TWO_TIME_PASSWORD_MISMATCH)

        return self.error(ErrorCode.OLD_PASSWORD_ERROR)

authenticate(self, event, **kwargs) #

抽象方法:认证

Parameters:

Name Type Description Default
event Event

认证事件

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def authenticate(self, event, **kwargs):
    tenant = event.tenant
    request = event.request

    data = request.POST or json.load(request.body)

    username = data.get('username')
    password = data.get('password')
    config_id = data.get('config_id')


    config = TenantExtensionConfig.active_objects.get(id=config_id).config
    login_enabled_field_names = [item["key"] if isinstance(item,dict) else item for item in config.get('login_enabled_field_names')]
    filter_params = None

    login_enabled_field_names = login_enabled_field_names or ["username"]

    for lefn in login_enabled_field_names:
        temp = {lefn:username}
        if filter_params:
            filter_params = Q(**temp) | filter_params
        else:
            filter_params = Q(**temp)

    users = tenant.users.filter(is_del=False).filter(filter_params)
    if len(users) > 1:
        logger.error(f'{username}{login_enabled_field_names}中匹配到多个用户')
        return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
    user = users[0] if users else None
    if user:
        # 对象转换
        user = User.expand_objects.filter(id=user.id).first()
        user_password = user.get("password")
        if user_password:
            if check_password(password, user_password):
                user = User.valid_objects.get(id=user.get("id"))
                return self.auth_success(user, event)

    return self.auth_failed(event, data=self.error(ErrorCode.USERNAME_PASSWORD_MISMATCH))

check_auth_data(self, event, **kwargs) #

响应检查认证凭证事件

Parameters:

Name Type Description Default
event

AUTHRULE_CHECK_AUTH_DATA事件

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def check_auth_data(self, event, **kwargs):
    pass

create_auth_manage_page(self) #

认证管理页面描述

Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_auth_manage_page(self):
    # 更改密码页面

    mine_password_path = self.register_api(
        "/mine_password/",
        'POST',
        self.update_mine_password,
        tenant_path=True,
        response=UpdateMinePasswordOut,
    )

    name = '更改密码'

    page = pages.FormPage(name=name)
    page.create_actions(
        init_action=actions.ConfirmAction(
            path=mine_password_path,
        ),
        global_actions={
            'confirm': actions.ConfirmAction(
                path=mine_password_path
            ),
        }
    )
    return page

create_login_page(self, event, config, config_data) #

抽象方法:组装登录页面表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_login_page(self, event, config, config_data):
    username_placeholder = ""
    for lefn in [item["key"] if isinstance(item,dict) else item for item in config.config.get('login_enabled_field_names',[])]:
        if username_placeholder:
            username_placeholder = ',' + User.key_fields[lefn]
        else:
            username_placeholder = User.key_fields[lefn]
    items = [
        {
            "type": "text",
            "name": "username",
            "placeholder": username_placeholder or '用户名'
        },
        {
            "type": "password",
            "name": "password",
            "placeholder": "密码"
        },
    ]
    self.add_page_form(config, self.LOGIN, "用户名密码登录", items, config_data)

create_other_page(self, event, config, config_data) #

抽象方法:组装登录页上其他操作表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_other_page(self, event, config, config_data):
    pass

create_password_page(self, event, config, config_data) #

抽象方法:组装重置密码页面表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_password_page(self, event, config, config_data):
    pass

create_register_page(self, event, config, config_data) #

抽象方法:组装注册页面表单

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_AUTH_FACTOR事件

required
config TenantExtensionConfig

插件运行时配置

required
config_data dict

运行时配置数据

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_register_page(self, event, config, config_data):
    items = []
    register_fields = [item["key"] if isinstance(item,dict) else item for item in config.config.get('register_enabled_field_names')]
    for rf in register_fields:
        items.append({
            "type": "text",
            "name": rf,
            "placeholder": User.key_fields[rf]
        })
    items.extend([
        {
            "type": "password",
            "name": "password",
            "placeholder": "密码"
        },
        {
            "type": "password",
            "name": "checkpassword",
            "placeholder": "密码确认"
        },
    ])
    self.add_page_form(config, self.REGISTER, "用户名密码注册", items, config_data)

fix_login_page(self, event, **kwargs) #

向login_pages填入认证元素

Parameters:

Name Type Description Default
event

AUTHRULE_FIX_LOGIN_PAGE事件

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def fix_login_page(self, event, **kwargs):
    pass

load(self) #

抽象方法,插件加载的入口方法

Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def load(self):
    super().load()
    self.register_extend_field(UserPassword, "password")
    self.register_auth_factor_schema(PasswordAuthFactorSchema, 'password')
    self.register_extend_api(AuthIn, password=str)
    user_key_fields_path = self.register_api(
        '/user_key_fields/',
        'GET',
        self.get_user_key_fields,
        response=List[GetUserKeyFieldItemOut],
    )
    select_pw_login_fields_page.create_actions(
        init_action=actions.DirectAction(
            path=user_key_fields_path,
            method=actions.FrontActionMethod.GET,
        ),
    )

    select_pw_register_login_fields_page.create_actions(
        init_action=actions.DirectAction(
            path=user_key_fields_path,
            method=actions.FrontActionMethod.GET,
        ),
    )

    self.register_front_pages(select_pw_login_fields_page)
    self.register_front_pages(select_pw_register_login_fields_page)

    # 租户管理员:用户管理-用户列表-重置密码
    reset_user_password_path = self.register_api(
        '/reset_user_password/{id}/',
        'POST',
        self.reset_user_password,
        tenant_path=True,
        response=ResponseSchema,
        auth=GlobalAuth()
    )

    user_list_page.add_local_actions(
        actions.OpenAction(
            name='重置密码',
            path=reset_user_password_path,
            method=actions.FrontActionMethod.POST,
        )
    )

    # 初始化部分配置数据
    tenant = Tenant.platform_tenant()
    if not self.get_tenant_configs(tenant):
        config = {
            'login_enabled_field_names': [{'key':'username'}],
            'register_enabled_field_names': [{'key':'username'}],
            'is_apply': False,
            'regular': '',
            'title': '',
        }
        self.create_tenant_config(tenant, config, "账密登录", "password")
    try:
        admin_user = User.active_objects.filter(username='admin').first()
        if admin_user:
            admin_password = UserPassword.active_objects.filter(target=admin_user)
            if not admin_password:
                admin_user.password = make_password('admin')
                admin_user.save()
    except Exception as e:
        print(e)

    self.listen_event(
        CREATE_TENANT,
        self.create_tenant_event
    )

reset_password(self, event, **kwargs) #

抽象方法:响应重置密码事件

Parameters:

Name Type Description Default
event Event

重置密码事件

required
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def reset_password(self, event, **kwargs):
    pass

update_mine_password(self, request, tenant_id, data) #

更改密码

Source code in extension_root/com_longgui_auth_factor_password/__init__.py
@operation(UpdateMinePasswordOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
def update_mine_password(self,request, tenant_id: str,data:UpdateMinePasswordIn):
    """更改密码"""
    user = request.user

    user_expand = User.expand_objects.get(id=user.id)

    user_password = user_expand["password"]
    if not user_password or check_password(data.old_password, user_password):
        if data.password == data.confirm_password:
            user.password = make_password(data.password)
            user.save()
            return self.success()
        else:
            return self.error(ErrorCode.TWO_TIME_PASSWORD_MISMATCH)

    return self.error(ErrorCode.OLD_PASSWORD_ERROR)

评论