Authentication
Features#
Authentication:Including mobile phone SMS verification code,Username Password,A series of plug -in with graphics verification code, etc.,Used to identify user identity or improve system security。
Implementation#
When developers create a new authentication factors,You need to inherit the AuthFactorextension base class and implement all abstract methods,The data process of the certification factor plug -in during operation is shown in the figure below:
sequenceDiagram
participant U as Client
participant C as Platform core
participant B as Certification factors plugin
C->>B: Loading plug -in
B->>C: Registration monitoring custom event:Certification,register,reset Password,Create_LOGIN_PAGE_AUTH_FACTOR
U->>C: Request to get the login page
C->>B: Trigger Create_LOGIN_PAGE_AUTH_Factor event
B->>C: Response event,Traversing all running configuration,Configure Login according to the configuration of running_pages
C->>U: Rendering login page
U->>C: Enter user voucher,Click the button,Enter the certification/register/Reset password and other processes
C->>B: Trigger certification registration/Reset password and other events
B->>C: Response certification registration/Reset password and other events,Complete the corresponding process,Return result
C->>U: Return to execution results
Abstract method#
- authenticate
- register
- reset_password
- create_login_page
- create_register_page
- create_password_page
- create_other_page
- create_auth_manage_page
Foundation definition#
arkid.core.extension.auth_factor.AuthFactorExtension (Extension)
#
Source code in arkid/core/extension/auth_factor.py
class AuthFactorExtension(Extension):
TYPE = "auth_factor"
composite_schema_map = {}
created_composite_schema_list = []
composite_key = 'type'
composite_model = TenantExtensionConfig
@property
def type(self):
return AuthFactorExtension.TYPE
LOGIN = 'login'
REGISTER = 'register'
RESET_PASSWORD = 'password'
def register_user_key_fields(self, **fields):
"""注册用户模型字段
"""
User.register_key_field(**fields)
def load(self):
super().load()
self.listen_events()
self.register_auth_manage_page()
def register_auth_factor_schema(self, schema, auth_factor_type):
"""注册认证因素运行时配置schema
Args:
schema (Schema): schema描述
auth_factor_type (str): 认证因素类型
"""
self.register_config_schema(schema, self.package + '_' + auth_factor_type)
self.register_composite_config_schema(schema, auth_factor_type, exclude=['extension'])
def start_authenticate(self,event,**kwargs):
"""响应认证事件: 认证前遍历认证规则,如通过所有规则则执行认证规程
Args:
event (Event): 认证事件
Returns:
Optional[User,None]: 如认证成功则返回user对象,如认证失败跳出事件循环报错
"""
config = self.get_current_config(event)
responses = dispatch_event(Event(tag=core_event.BEFORE_AUTH, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config.id.hex}))
for useless,(response,useless) in responses:
if not response:
continue
result,data = response
if not result:
return self.auth_failed(event,data)
return self.authenticate(event, **kwargs)
@abstractmethod
def authenticate(self, event, **kwargs):
"""抽象方法:认证
Args:
event (Event): 认证事件
"""
pass
def auth_success(self, user, event, **kwargs):
"""封装认证成功返回值,同时触发认证成功事件,如核心判定事件无异常(无其他插件或机制认为结果为失败)则返回认证成功后的用户
Args:
user (User): 用户
event (Event): 认证事件
Returns:
Optional[User,None]: 如认证成功则返回user对象,如认证失败跳出事件循环报错
"""
config = self.get_current_config(event)
responses = dispatch_event(Event(tag=core_event.AUTH_SUCCESS, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config,"user":user}))
for useless,(response,useless) in responses:
if not response:
continue
result,data = response
if not result:
return self.auth_failed(event,data)
return user
def auth_failed(self, event, data, **kwargs):
"""封装认证失败返回值,同时触发认证失败事件,打破事件循环,返回报错信息
Args:
event (_type_): 认证事件
data (_type_): 结果描述
"""
config = self.get_current_config(event)
dispatch_event(Event(tag=core_event.AUTH_FAIL, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config.id.hex,"data":data}))
core_event.remove_event_id(event)
core_event.break_event_loop(data)
@abstractmethod
def register(self, event, **kwargs):
"""抽象方法:响应注册事件
Args:
event (Event): 注册事件
"""
pass
@abstractmethod
def reset_password(self, event, **kwargs):
"""抽象方法:响应重置密码事件
Args:
event (Event): 重置密码事件
"""
pass
def create_response(self, event, **kwargs):
"""响应事件:CREATE_LOGIN_PAGE_AUTH_FACTOR事件,组装登陆页面schema描述
Args:
event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
Returns:
dict: 组装好的登陆页面元素(非最终结构)
"""
logger.info(f'{self.package} create_response start')
self.data = {}
configs = self.get_tenant_configs(event.tenant)
for config in configs:
config_data = {
self.LOGIN: {
'forms':[],
'bottoms':[],
'extend':{},
},
self.REGISTER: {
'forms':[],
'bottoms':[],
'extend':{},
},
self.RESET_PASSWORD: {
'forms':[],
'bottoms':[],
'extend':{},
},
}
if config.config.get("login_enabled", True):
self.create_login_page(event,config,config_data)
if config.config.get("register_enabled", True):
self.create_register_page(event, config,config_data)
if config.config.get("reset_password_enabled", True):
self.create_password_page(event, config,config_data)
self.create_other_page(event, config, config_data)
self.data[config.id.hex] = config_data
logger.info(self.data)
logger.info(f'{self.package} create_response end')
return self.data
def add_page_form(self, config, page_name, label, items, config_data, submit_url=None, submit_label=None):
"""向config_data中添加表单元素
Args:
config (TenantExtensionConfig): 插件运行时配置
page_name (str): 页面名称
label (str): 标签
items (list): 页面元素描述列表
config_data (dict): 运行时配置数据
submit_url (str, optional): 表单提交地址. Defaults to None.
submit_label (str, optional): 表单提交按钮标签. Defaults to None.
"""
default = {
"login": ("登录", f"/api/v1/tenant/tenant_id/auth/?event_tag={self.auth_event_tag}"),
"register": ("注册", f"/api/v1/tenant/tenant_id/register/?event_tag={self.register_event_tag}"),
"password": ("重置密码", f"/api/v1/tenant/tenant_id/reset_password/?event_tag={self.password_event_tag}"),
}
if not submit_label:
submit_label, useless = default.get(page_name)
if not submit_url:
useless, submit_url = default.get(page_name)
items.append({"type": "hidden", "name": "config_id", "value": config.id})
config_data[page_name]['forms'].append({
'label': config.name or label,
'items': items,
'submit': {'label': submit_label, 'title':submit_label,'http': {'url': submit_url, 'method': "post"}}
})
def add_page_bottoms(self, page_name, bottoms):
self.data[page_name]['bottoms'].append(bottoms)
def add_page_extend(self, page_name, buttons, title=None):
if not self.data[page_name].get('extend'):
self.data[page_name]['extend'] = {}
self.data[page_name]['extend']['title'] = title
self.data[page_name]['extend']['buttons'].append(buttons)
@abstractmethod
def create_login_page(self, event, config, config_data):
"""抽象方法:组装登录页面表单
Args:
event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
config (TenantExtensionConfig): 插件运行时配置
config_data (dict): 运行时配置数据
"""
pass
@abstractmethod
def create_register_page(self, event, config, config_data):
"""抽象方法:组装注册页面表单
Args:
event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
config (TenantExtensionConfig): 插件运行时配置
config_data (dict): 运行时配置数据
"""
pass
@abstractmethod
def create_password_page(self, event, config, config_data):
"""抽象方法:组装重置密码页面表单
Args:
event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
config (TenantExtensionConfig): 插件运行时配置
config_data (dict): 运行时配置数据
"""
pass
@abstractmethod
def create_other_page(self, event, config, config_data):
"""抽象方法:组装登录页上其他操作表单
Args:
event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
config (TenantExtensionConfig): 插件运行时配置
config_data (dict): 运行时配置数据
"""
pass
def register_auth_manage_page(self):
""" 向认证管理页面添加页面
"""
from api.v1.pages.mine.auth_manage import page as auth_manage_page
pages = self.create_auth_manage_page()
if not pages:
return
if not isinstance(pages,list):
pages = [pages]
for page in pages:
self.register_front_pages(page)
auth_manage_page.add_pages(page)
@abstractmethod
def create_auth_manage_page(self):
"""抽象方法: 认证管理页面描述
"""
pass
@abstractmethod
def check_auth_data(self, event, **kwargs):
""" 响应检查认证凭证事件
Args:
event: AUTHRULE_CHECK_AUTH_DATA事件
"""
pass
@abstractmethod
def fix_login_page(self, event, **kwargs):
"""向login_pages填入认证元素
Args:
event: AUTHRULE_FIX_LOGIN_PAGE事件
"""
pass
def get_current_config(self, event):
"""获取事件指向的运行时配置
Args:
event (Event): 事件
Returns:
TenantExtensionConfig: 运行时配置
"""
config_id = event.request.POST.get('config_id')
return self.get_config_by_id(config_id)
def listen_events(self):
"""注册并监听事件
"""
self.auth_event_tag = self.register_event('auth', '认证')
self.listen_event(self.auth_event_tag, self.start_authenticate)
self.register_event_tag = self.register_event('register', '注册')
self.listen_event(self.register_event_tag, self.register)
self.password_event_tag = self.register_event('password', '重置密码')
self.listen_event(self.password_event_tag, self.reset_password)
self.listen_event(core_event.CREATE_LOGIN_PAGE_AUTH_FACTOR, self.create_response)
self.listen_event(core_event.AUTHRULE_CHECK_AUTH_DATA,self.check_auth_data)
self.listen_event(core_event.AUTHRULE_FIX_LOGIN_PAGE,self.fix_login_page)
composite_model (BaseModel)
django-model
#
TenantExtensionConfig(id, is_del, is_active, updated, created, tenant, extension, config, name, type)
Source code in arkid/core/extension/auth_factor.py
class TenantExtensionConfig(BaseModel):
class Meta(object):
verbose_name = _("插件运行时配置")
verbose_name_plural = _("插件运行时配置")
tenant = models.ForeignKey('core.Tenant', blank=False, on_delete=models.PROTECT, verbose_name=_('租户'))
extension = models.ForeignKey('Extension', blank=False, on_delete=models.PROTECT, verbose_name=_('插件'))
config = models.JSONField(blank=True, default=dict, verbose_name=_('Runtime Config','运行时配置'))
name = models.CharField(max_length=128, default='', verbose_name=_('名称'))
type = models.CharField(max_length=128, default='', verbose_name=_('类型'))
config: JSONField
blank
django-field
#
Runtime Config
created: DateTimeField
blank
django-field
nullable
#
创建时间
extension: ForeignKey
django-field
#
插件
id: UUIDField
django-field
#
ID
is_active: BooleanField
django-field
#
是否可用
is_del: BooleanField
django-field
#
是否删除
name: CharField
django-field
#
名称
tenant: ForeignKey
django-field
#
租户
type: CharField
django-field
#
类型
updated: DateTimeField
blank
django-field
nullable
#
更新时间
add_page_form(self, config, page_name, label, items, config_data, submit_url=None, submit_label=None)
#
向config_data中添加表单元素
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
TenantExtensionConfig |
插件运行时配置 |
required |
page_name |
str |
页面名称 |
required |
label |
str |
标签 |
required |
items |
list |
页面元素描述列表 |
required |
config_data |
dict |
运行时配置数据 |
required |
submit_url |
str |
表单提交地址. Defaults to None. |
None |
submit_label |
str |
表单提交按钮标签. Defaults to None. |
None |
Source code in arkid/core/extension/auth_factor.py
def add_page_form(self, config, page_name, label, items, config_data, submit_url=None, submit_label=None):
"""向config_data中添加表单元素
Args:
config (TenantExtensionConfig): 插件运行时配置
page_name (str): 页面名称
label (str): 标签
items (list): 页面元素描述列表
config_data (dict): 运行时配置数据
submit_url (str, optional): 表单提交地址. Defaults to None.
submit_label (str, optional): 表单提交按钮标签. Defaults to None.
"""
default = {
"login": ("登录", f"/api/v1/tenant/tenant_id/auth/?event_tag={self.auth_event_tag}"),
"register": ("注册", f"/api/v1/tenant/tenant_id/register/?event_tag={self.register_event_tag}"),
"password": ("重置密码", f"/api/v1/tenant/tenant_id/reset_password/?event_tag={self.password_event_tag}"),
}
if not submit_label:
submit_label, useless = default.get(page_name)
if not submit_url:
useless, submit_url = default.get(page_name)
items.append({"type": "hidden", "name": "config_id", "value": config.id})
config_data[page_name]['forms'].append({
'label': config.name or label,
'items': items,
'submit': {'label': submit_label, 'title':submit_label,'http': {'url': submit_url, 'method': "post"}}
})
auth_failed(self, event, data, **kwargs)
#
封装认证失败返回值,同时触发认证失败事件,打破事件循环,返回报错信息
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
_type_ |
认证事件 |
required |
data |
_type_ |
结果描述 |
required |
Source code in arkid/core/extension/auth_factor.py
def auth_failed(self, event, data, **kwargs):
"""封装认证失败返回值,同时触发认证失败事件,打破事件循环,返回报错信息
Args:
event (_type_): 认证事件
data (_type_): 结果描述
"""
config = self.get_current_config(event)
dispatch_event(Event(tag=core_event.AUTH_FAIL, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config.id.hex,"data":data}))
core_event.remove_event_id(event)
core_event.break_event_loop(data)
auth_success(self, user, event, **kwargs)
#
封装认证成功返回值,同时触发认证成功事件,如核心判定事件无异常(无其他插件或机制认为结果为失败)则返回认证成功后的用户
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
User |
用户 |
required |
event |
Event |
认证事件 |
required |
Returns:
Type | Description |
---|---|
Optional[User,None] |
如认证成功则返回user对象,如认证失败跳出事件循环报错 |
Source code in arkid/core/extension/auth_factor.py
def auth_success(self, user, event, **kwargs):
"""封装认证成功返回值,同时触发认证成功事件,如核心判定事件无异常(无其他插件或机制认为结果为失败)则返回认证成功后的用户
Args:
user (User): 用户
event (Event): 认证事件
Returns:
Optional[User,None]: 如认证成功则返回user对象,如认证失败跳出事件循环报错
"""
config = self.get_current_config(event)
responses = dispatch_event(Event(tag=core_event.AUTH_SUCCESS, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config,"user":user}))
for useless,(response,useless) in responses:
if not response:
continue
result,data = response
if not result:
return self.auth_failed(event,data)
return user
authenticate(self, event, **kwargs)
#
check_auth_data(self, event, **kwargs)
#
create_auth_manage_page(self)
#
create_login_page(self, event, config, config_data)
#
抽象方法:组装登录页面表单
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_AUTH_FACTOR事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
config_data |
dict |
运行时配置数据 |
required |
create_other_page(self, event, config, config_data)
#
抽象方法:组装登录页上其他操作表单
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_AUTH_FACTOR事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
config_data |
dict |
运行时配置数据 |
required |
create_password_page(self, event, config, config_data)
#
抽象方法:组装重置密码页面表单
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_AUTH_FACTOR事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
config_data |
dict |
运行时配置数据 |
required |
create_register_page(self, event, config, config_data)
#
抽象方法:组装注册页面表单
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_AUTH_FACTOR事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
config_data |
dict |
运行时配置数据 |
required |
create_response(self, event, **kwargs)
#
响应事件:CREATE_LOGIN_PAGE_AUTH_FACTOR事件,组装登陆页面schema描述
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_AUTH_FACTOR事件 |
required |
Returns:
Type | Description |
---|---|
dict |
组装好的登陆页面元素(非最终结构) |
Source code in arkid/core/extension/auth_factor.py
def create_response(self, event, **kwargs):
"""响应事件:CREATE_LOGIN_PAGE_AUTH_FACTOR事件,组装登陆页面schema描述
Args:
event (Event): CREATE_LOGIN_PAGE_AUTH_FACTOR事件
Returns:
dict: 组装好的登陆页面元素(非最终结构)
"""
logger.info(f'{self.package} create_response start')
self.data = {}
configs = self.get_tenant_configs(event.tenant)
for config in configs:
config_data = {
self.LOGIN: {
'forms':[],
'bottoms':[],
'extend':{},
},
self.REGISTER: {
'forms':[],
'bottoms':[],
'extend':{},
},
self.RESET_PASSWORD: {
'forms':[],
'bottoms':[],
'extend':{},
},
}
if config.config.get("login_enabled", True):
self.create_login_page(event,config,config_data)
if config.config.get("register_enabled", True):
self.create_register_page(event, config,config_data)
if config.config.get("reset_password_enabled", True):
self.create_password_page(event, config,config_data)
self.create_other_page(event, config, config_data)
self.data[config.id.hex] = config_data
logger.info(self.data)
logger.info(f'{self.package} create_response end')
return self.data
fix_login_page(self, event, **kwargs)
#
get_current_config(self, event)
#
获取事件指向的运行时配置
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
事件 |
required |
Returns:
Type | Description |
---|---|
TenantExtensionConfig |
运行时配置 |
listen_events(self)
#
注册并监听事件
Source code in arkid/core/extension/auth_factor.py
def listen_events(self):
"""注册并监听事件
"""
self.auth_event_tag = self.register_event('auth', '认证')
self.listen_event(self.auth_event_tag, self.start_authenticate)
self.register_event_tag = self.register_event('register', '注册')
self.listen_event(self.register_event_tag, self.register)
self.password_event_tag = self.register_event('password', '重置密码')
self.listen_event(self.password_event_tag, self.reset_password)
self.listen_event(core_event.CREATE_LOGIN_PAGE_AUTH_FACTOR, self.create_response)
self.listen_event(core_event.AUTHRULE_CHECK_AUTH_DATA,self.check_auth_data)
self.listen_event(core_event.AUTHRULE_FIX_LOGIN_PAGE,self.fix_login_page)
load(self)
#
register(self, event, **kwargs)
#
register_auth_factor_schema(self, schema, auth_factor_type)
#
注册认证因素运行时配置schema
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schema |
Schema |
schema描述 |
required |
auth_factor_type |
str |
认证因素类型 |
required |
Source code in arkid/core/extension/auth_factor.py
def register_auth_factor_schema(self, schema, auth_factor_type):
"""注册认证因素运行时配置schema
Args:
schema (Schema): schema描述
auth_factor_type (str): 认证因素类型
"""
self.register_config_schema(schema, self.package + '_' + auth_factor_type)
self.register_composite_config_schema(schema, auth_factor_type, exclude=['extension'])
register_auth_manage_page(self)
#
向认证管理页面添加页面
Source code in arkid/core/extension/auth_factor.py
def register_auth_manage_page(self):
""" 向认证管理页面添加页面
"""
from api.v1.pages.mine.auth_manage import page as auth_manage_page
pages = self.create_auth_manage_page()
if not pages:
return
if not isinstance(pages,list):
pages = [pages]
for page in pages:
self.register_front_pages(page)
auth_manage_page.add_pages(page)
register_user_key_fields(self, **fields)
#
reset_password(self, event, **kwargs)
#
start_authenticate(self, event, **kwargs)
#
响应认证事件: 认证前遍历认证规则,如通过所有规则则执行认证规程
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
认证事件 |
required |
Returns:
Type | Description |
---|---|
Optional[User,None] |
如认证成功则返回user对象,如认证失败跳出事件循环报错 |
Source code in arkid/core/extension/auth_factor.py
def start_authenticate(self,event,**kwargs):
"""响应认证事件: 认证前遍历认证规则,如通过所有规则则执行认证规程
Args:
event (Event): 认证事件
Returns:
Optional[User,None]: 如认证成功则返回user对象,如认证失败跳出事件循环报错
"""
config = self.get_current_config(event)
responses = dispatch_event(Event(tag=core_event.BEFORE_AUTH, tenant=event.tenant, request=event.request, data={"auth_factor_config_id":config.id.hex}))
for useless,(response,useless) in responses:
if not response:
continue
result,data = response
if not result:
return self.auth_failed(event,data)
return self.authenticate(event, **kwargs)
Exemplary#
extension_root.com_longgui_auth_factor_mobile.MobileAuthFactorExtension (AuthFactorExtension)
#
手机短信验证码认证因素插件
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
class MobileAuthFactorExtension(AuthFactorExtension):
"""手机短信验证码认证因素插件
"""
def load(self):
"""加载插件
"""
super().load()
self.create_extension_config_schema()
self.register_extend_field(UserMobile, "mobile")
from api.v1.schema.auth import AuthIn
from api.v1.schema.user import UserCreateIn,UserItemOut,UserUpdateIn,UserListItemOut
from api.v1.schema.mine import ProfileSchemaOut
self.register_extend_api(
AuthIn,
UserCreateIn,
UserItemOut,
UserUpdateIn,
UserListItemOut,
mobile=(Optional[str],Field(title=_("电话号码"))),
# areacode=(str,Field(title=_("区号")))
)
self.register_extend_api(
ProfileSchemaOut,
mobile=(Optional[str],Field(readonly=True))
)
# 注册发送短信接口
self.url_send_sms_code = self.register_api(
'/config/{config_id}/send_sms_code/',
'POST',
self.send_sms_code,
tenant_path=True,
auth=None,
response=SendSMSCodeOut,
)
print(self.url_send_sms_code)
def authenticate(self, event, **kwargs):
""" 认证
通过手机号码查找用户并校验短信验证码
Args:
event (Event): 事件
"""
tenant = event.tenant
request = event.request
data = request.POST or json.load(request.body)
mobile = data.get('mobile')
sms_code = data.get('sms_code')
# user = User.expand_objects.filter(tenant=tenant,mobile=mobile)
temp_users = tenant.users.all()
user_ids = []
for temp_user in temp_users:
user_ids.append(temp_user.id)
user = User.expand_objects.filter(
is_active=True,
is_del=False,
id__in=user_ids,
mobile=mobile
)
if len(user) > 1:
logger.error(f'{mobile}在数据库中匹配到多个用户')
return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
if user:
user = user[0]
if check_sms_code(tenant, mobile, sms_code):
user = User.active_objects.get(id=user.get("id"))
return self.auth_success(user,event)
else:
msg = ErrorCode.SMS_CODE_MISMATCH
else:
msg = ErrorCode.MOBILE_NOT_EXISTS_ERROR
return self.auth_failed(event, data=self.error(msg))
@transaction.atomic()
def register(self, event, **kwargs):
""" 注册用户
Args:
event (Event): 事件
"""
tenant = event.tenant
request = event.request
data = request.POST or json.load(request.body)
mobile = data.get('mobile')
sms_code = data.get('sms_code')
username = data.get('username')
config = self.get_current_config(event)
ret, message = self.check_mobile_exists(mobile, tenant)
if not ret:
return self.error(message)
if not check_sms_code(tenant, mobile, sms_code):
return self.error(ErrorCode.SMS_CODE_MISMATCH)
ret, message = self.check_username_exists(username, tenant)
if not ret:
return self.error(message)
user = User(tenant=tenant)
user.mobile = mobile
user.username = username
user.save()
tenant.users.add(user)
tenant.save()
return user
def reset_password(self, event, **kwargs):
""" 重置密码
Args:
event (Event): 事件
"""
tenant = event.tenant
request = event.request
data = request.POST or json.load(request.body)
mobile = data.get('mobile')
sms_code = data.get('sms_code')
password = data.get('password')
checkpassword = data.get('checkpassword')
if password != checkpassword:
return self.error(ErrorCode.PASSWORD_IS_INCONSISTENT)
if not check_sms_code(tenant, mobile, sms_code):
return self.error(ErrorCode.SMS_CODE_MISMATCH)
# user = User.expand_objects.filter(tenant=tenant,mobile=mobile)
temp_users = tenant.users.all()
user_ids = []
for temp_user in temp_users:
user_ids.append(temp_user.id)
user = User.expand_objects.filter(
is_active=True,
is_del=False,
id__in=user_ids,
mobile=mobile
)
if len(user) > 1:
logger.error(f'{mobile}在数据库中匹配到多个用户')
return self.error(ErrorCode.CONTACT_MANAGER)
if user:
user = user[0]
user.password = make_password(password)
user.save()
return self.success()
return self.error(ErrorCode.MOBILE_NOT_EXISTS_ERROR)
def create_login_page(self, event, config, config_data):
""" 生成手机验证码登录页面Schema描述
Args:
event (Event): 事件
config (TenantExtensionConfig): 插件运行时配置
"""
items = [
{
"type": "text",
"name":"mobile",
"placeholder": "手机号码",
"append": {
"title": "发送验证码",
"http": {
"url": self.url_send_sms_code,
"method": "post",
"params": {
"mobile": "mobile",
"areacode": "86",
},
},
"delay": 60
}
},
{
"type": "text",
"name":"sms_code",
"placeholder": "验证码",
}
]
self.add_page_form(config, self.LOGIN, "手机验证码登录", items, config_data)
def create_register_page(self, event, config, config_data):
"""生成手机验证码用户注册页面Schema描述
因本插件提供重置密码功能,此处需用户指定账号用户名
Args:
event (Event): 事件
config (TenantExtensionConfig): 插件运行时配置
"""
items = [
{
"type": "text",
"name": "username",
"placeholder": "用户名"
},
{
"type": "text",
"name":"mobile",
"placeholder": "手机号码",
"append": {
"title": "发送验证码",
"http": {
"url": self.url_send_sms_code,
"method": "post",
"params": {
"mobile": "mobile",
"areacode": "86",
},
},
"delay": 60
}
},
{
"type": "text",
"name":"sms_code",
"placeholder": "验证码"
}
]
self.add_page_form(config, self.REGISTER, "手机验证码注册", items, config_data)
def create_password_page(self, event, config, config_data):
"""生成重置密码页面Schema描述
通过手机验证码重置密码时需提供手机号码以及对应验证码,同时此处添加新密码确认机制
注意:重置密码功能需要启用用户名密码认证插件以提供完整支持
Args:
event (Event): 事件
config (TenantExtensionConfig): 插件运行时配置
"""
items = [
{
"type": "text",
"name":"mobile",
"placeholder": "手机号码",
"append": {
"title": "发送验证码",
"http": {
"url": self.url_send_sms_code,
"method": "post",
"params": {
"mobile": "mobile",
"areacode": "86",
},
},
}
},
{
"type": "text",
"name":"sms_code",
"placeholder": "验证码"
},
{
"type": "password",
"name":"password",
"placeholder": "密码"
},
{
"type": "password",
"name":"checkpassword",
"placeholder": "密码确认"
}
]
self.add_page_form(config, self.RESET_PASSWORD, "手机验证码重置密码", items, config_data)
def create_other_page(self, event, config, config_data):
"""创建其他页面(本插件无相关页面)
Args:
event (Event): 事件
config (TenantExtensionConfig): 插件运行时配置
"""
pass
def check_mobile_exists(self, mobile, tenant):
"""检查电话号码是否已存在
Args:
mobile (str): 手机号
tenant (Tenant): 租户
Returns:
(bool,ErrorCode): mobile是否存在以及对应错误
"""
if not mobile:
return False, ErrorCode.MOBILE_EMPTY
# 需要临时存储
temp_users = tenant.users.all()
user_ids = []
for temp_user in temp_users:
user_ids.append(temp_user.id)
if User.expand_objects.filter(
is_active=True,
is_del=False,
id__in=user_ids,
mobile=mobile
).count():
# if User.expand_objects.filter(tenant=tenant,mobile=mobile).count():
return False, ErrorCode.MOBILE_EXISTS_ERROR
return True, None
def check_username_exists(self,username,tenant):
"""检查用户名是否已存在
Args:
username (str): 用户名
tenant (Tenant): 租户
Returns:
(bool,ErrorCode): username是否存在以及对应错误
"""
# 检查username是否为空
if not username:
return False, ErrorCode.USERNAME_EMPTY
# 检查username是否已存在
if tenant.users.filter(is_active=True, is_del=False).filter(username=username).count():
# if User.expand_objects.filter(tenant=tenant,username=username).count():
return False, ErrorCode.USERNAME_EXISTS_ERROR
return True, None
def check_auth_data(self, event, **kwargs):
pass
def fix_login_page(self, event, **kwargs):
pass
def create_auth_manage_page(self):
""" 创建“我的-认证管理”中的更换手机号码页面
"""
_pages = []
mine_mobile_path = self.register_api(
"/mine_mobile/",
"GET",
self.mine_mobile,
tenant_path=True,
auth=GlobalAuth(),
response=MineMobileOut
)
upodate_mine_mobile_path = self.register_api(
"/mine_mobile/",
'POST',
self.update_mine_mobile,
tenant_path=True,
auth=GlobalAuth(),
response=UpdateMineMobileOut
)
name = '更改手机号码'
page = pages.FormPage(name=name)
page.create_actions(
init_action=actions.DirectAction(
path=mine_mobile_path,
method=actions.FrontActionMethod.GET,
),
global_actions={
'confirm': actions.ConfirmAction(
path=upodate_mine_mobile_path
),
}
)
_pages.append(page)
return _pages
def create_extension_config_schema(self):
"""创建插件运行时配置schema描述
"""
select_sms_page = pages.TablePage(select=True,name=_("指定短信插件运行时"))
self.register_front_pages(select_sms_page)
select_sms_page.create_actions(
init_action=actions.DirectAction(
path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=sms',
method=actions.FrontActionMethod.GET
)
)
MobileAuthFactorSchema = create_extension_schema(
'MobileAuthFactorSchema',
__file__,
[
(
'sms_config',
MobileAuthFactorConfigSchema,
Field(
title=_('sms extension config', '短信插件运行时'),
page=select_sms_page.tag,
),
),
(
'code_length',
int,
Field(
title=_('code_length', '验证码长度'),
default=6
)
),
(
'expired',
Optional[int],
Field(
title=_('expired', '有效期/分钟'),
default=10,
)
),
],
BaseAuthFactorSchema,
)
self.register_auth_factor_schema(MobileAuthFactorSchema, 'mobile')
@operation(SendSMSCodeOut)
def send_sms_code(self,request,tenant_id,config_id:str,data:SendSMSCodeIn):
"""发送短信验证码
"""
tenant = request.tenant
mobile = data.mobile
config = self.get_config_by_id(config_id)
if not config:
return self.error(ErrorCode.CONFIG_IS_NOT_EXISTS)
if not mobile or mobile=="mobile":
return self.error(ErrorCode.MOBILE_EMPTY)
code = create_sms_code(tenant,mobile,config.config.get('code_length',6),config.config.get("expired",10)*60)
responses = dispatch_event(
Event(
tag=SEND_SMS,
tenant=tenant,
request=request,
data={
"config_id":config.config["sms_config"]["id"],
"mobile":data.mobile,
"code": code,
"areacode": data.areacode,
"username": request.user.username if request.user else ""
},
packages=config.config["sms_config"]["package"]
)
)
if not responses:
return self.error(ErrorCode.SMS_EXTENSION_NOT_EXISTS)
useless, (data, extension) = responses[0]
if data:
return self.success()
else:
return self.error(ErrorCode.SMS_SEND_FAILED)
@operation(UpdateMineMobileOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
def update_mine_mobile(self, request, tenant_id: str,data:UpdateMineMobileIn):
""" 普通用户:更新手机号码
"""
mobile = data.mobile
ret, message = self.check_mobile_exists(mobile, request.tenant)
if not ret:
return self.error(message)
if not check_sms_code(request.tenant,mobile,data.code):
return self.error(ErrorCode.SMS_CODE_MISMATCH)
user = request.user
user.mobile=data.mobile
user.save()
return self.success()
@operation(MineMobileOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
def mine_mobile(self,request,tenant_id: str):
user = request.user
user_expand = User.expand_objects.filter(id=user.id).first()
config = self.get_tenant_configs(request.tenant).first()
if not config:
return self.error(
ErrorCode.CONFIG_IS_NOT_EXISTS
)
return self.success(
data={
"current_mobile": user_expand.get("mobile",None),
"mobile": "",
"code": "",
"config_id": config.id.hex,
},
)
authenticate(self, event, **kwargs)
#
认证
通过手机号码查找用户并校验短信验证码
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
事件 |
required |
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def authenticate(self, event, **kwargs):
""" 认证
通过手机号码查找用户并校验短信验证码
Args:
event (Event): 事件
"""
tenant = event.tenant
request = event.request
data = request.POST or json.load(request.body)
mobile = data.get('mobile')
sms_code = data.get('sms_code')
# user = User.expand_objects.filter(tenant=tenant,mobile=mobile)
temp_users = tenant.users.all()
user_ids = []
for temp_user in temp_users:
user_ids.append(temp_user.id)
user = User.expand_objects.filter(
is_active=True,
is_del=False,
id__in=user_ids,
mobile=mobile
)
if len(user) > 1:
logger.error(f'{mobile}在数据库中匹配到多个用户')
return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
if user:
user = user[0]
if check_sms_code(tenant, mobile, sms_code):
user = User.active_objects.get(id=user.get("id"))
return self.auth_success(user,event)
else:
msg = ErrorCode.SMS_CODE_MISMATCH
else:
msg = ErrorCode.MOBILE_NOT_EXISTS_ERROR
return self.auth_failed(event, data=self.error(msg))
check_auth_data(self, event, **kwargs)
#
check_mobile_exists(self, mobile, tenant)
#
检查电话号码是否已存在
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mobile |
str |
手机号 |
required |
tenant |
Tenant |
租户 |
required |
Returns:
Type | Description |
---|---|
(bool,ErrorCode) |
mobile是否存在以及对应错误 |
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def check_mobile_exists(self, mobile, tenant):
"""检查电话号码是否已存在
Args:
mobile (str): 手机号
tenant (Tenant): 租户
Returns:
(bool,ErrorCode): mobile是否存在以及对应错误
"""
if not mobile:
return False, ErrorCode.MOBILE_EMPTY
# 需要临时存储
temp_users = tenant.users.all()
user_ids = []
for temp_user in temp_users:
user_ids.append(temp_user.id)
if User.expand_objects.filter(
is_active=True,
is_del=False,
id__in=user_ids,
mobile=mobile
).count():
# if User.expand_objects.filter(tenant=tenant,mobile=mobile).count():
return False, ErrorCode.MOBILE_EXISTS_ERROR
return True, None
check_username_exists(self, username, tenant)
#
检查用户名是否已存在
Parameters:
Name | Type | Description | Default |
---|---|---|---|
username |
str |
用户名 |
required |
tenant |
Tenant |
租户 |
required |
Returns:
Type | Description |
---|---|
(bool,ErrorCode) |
username是否存在以及对应错误 |
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def check_username_exists(self,username,tenant):
"""检查用户名是否已存在
Args:
username (str): 用户名
tenant (Tenant): 租户
Returns:
(bool,ErrorCode): username是否存在以及对应错误
"""
# 检查username是否为空
if not username:
return False, ErrorCode.USERNAME_EMPTY
# 检查username是否已存在
if tenant.users.filter(is_active=True, is_del=False).filter(username=username).count():
# if User.expand_objects.filter(tenant=tenant,username=username).count():
return False, ErrorCode.USERNAME_EXISTS_ERROR
return True, None
create_auth_manage_page(self)
#
创建“我的-认证管理”中的更换手机号码页面
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_auth_manage_page(self):
""" 创建“我的-认证管理”中的更换手机号码页面
"""
_pages = []
mine_mobile_path = self.register_api(
"/mine_mobile/",
"GET",
self.mine_mobile,
tenant_path=True,
auth=GlobalAuth(),
response=MineMobileOut
)
upodate_mine_mobile_path = self.register_api(
"/mine_mobile/",
'POST',
self.update_mine_mobile,
tenant_path=True,
auth=GlobalAuth(),
response=UpdateMineMobileOut
)
name = '更改手机号码'
page = pages.FormPage(name=name)
page.create_actions(
init_action=actions.DirectAction(
path=mine_mobile_path,
method=actions.FrontActionMethod.GET,
),
global_actions={
'confirm': actions.ConfirmAction(
path=upodate_mine_mobile_path
),
}
)
_pages.append(page)
return _pages
create_extension_config_schema(self)
#
创建插件运行时配置schema描述
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_extension_config_schema(self):
"""创建插件运行时配置schema描述
"""
select_sms_page = pages.TablePage(select=True,name=_("指定短信插件运行时"))
self.register_front_pages(select_sms_page)
select_sms_page.create_actions(
init_action=actions.DirectAction(
path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=sms',
method=actions.FrontActionMethod.GET
)
)
MobileAuthFactorSchema = create_extension_schema(
'MobileAuthFactorSchema',
__file__,
[
(
'sms_config',
MobileAuthFactorConfigSchema,
Field(
title=_('sms extension config', '短信插件运行时'),
page=select_sms_page.tag,
),
),
(
'code_length',
int,
Field(
title=_('code_length', '验证码长度'),
default=6
)
),
(
'expired',
Optional[int],
Field(
title=_('expired', '有效期/分钟'),
default=10,
)
),
],
BaseAuthFactorSchema,
)
self.register_auth_factor_schema(MobileAuthFactorSchema, 'mobile')
create_login_page(self, event, config, config_data)
#
生成手机验证码登录页面Schema描述
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_login_page(self, event, config, config_data):
""" 生成手机验证码登录页面Schema描述
Args:
event (Event): 事件
config (TenantExtensionConfig): 插件运行时配置
"""
items = [
{
"type": "text",
"name":"mobile",
"placeholder": "手机号码",
"append": {
"title": "发送验证码",
"http": {
"url": self.url_send_sms_code,
"method": "post",
"params": {
"mobile": "mobile",
"areacode": "86",
},
},
"delay": 60
}
},
{
"type": "text",
"name":"sms_code",
"placeholder": "验证码",
}
]
self.add_page_form(config, self.LOGIN, "手机验证码登录", items, config_data)
create_other_page(self, event, config, config_data)
#
创建其他页面(本插件无相关页面)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
create_password_page(self, event, config, config_data)
#
生成重置密码页面Schema描述
通过手机验证码重置密码时需提供手机号码以及对应验证码,同时此处添加新密码确认机制
注意:重置密码功能需要启用用户名密码认证插件以提供完整支持
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_password_page(self, event, config, config_data):
"""生成重置密码页面Schema描述
通过手机验证码重置密码时需提供手机号码以及对应验证码,同时此处添加新密码确认机制
注意:重置密码功能需要启用用户名密码认证插件以提供完整支持
Args:
event (Event): 事件
config (TenantExtensionConfig): 插件运行时配置
"""
items = [
{
"type": "text",
"name":"mobile",
"placeholder": "手机号码",
"append": {
"title": "发送验证码",
"http": {
"url": self.url_send_sms_code,
"method": "post",
"params": {
"mobile": "mobile",
"areacode": "86",
},
},
}
},
{
"type": "text",
"name":"sms_code",
"placeholder": "验证码"
},
{
"type": "password",
"name":"password",
"placeholder": "密码"
},
{
"type": "password",
"name":"checkpassword",
"placeholder": "密码确认"
}
]
self.add_page_form(config, self.RESET_PASSWORD, "手机验证码重置密码", items, config_data)
create_register_page(self, event, config, config_data)
#
生成手机验证码用户注册页面Schema描述
因本插件提供重置密码功能,此处需用户指定账号用户名
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def create_register_page(self, event, config, config_data):
"""生成手机验证码用户注册页面Schema描述
因本插件提供重置密码功能,此处需用户指定账号用户名
Args:
event (Event): 事件
config (TenantExtensionConfig): 插件运行时配置
"""
items = [
{
"type": "text",
"name": "username",
"placeholder": "用户名"
},
{
"type": "text",
"name":"mobile",
"placeholder": "手机号码",
"append": {
"title": "发送验证码",
"http": {
"url": self.url_send_sms_code,
"method": "post",
"params": {
"mobile": "mobile",
"areacode": "86",
},
},
"delay": 60
}
},
{
"type": "text",
"name":"sms_code",
"placeholder": "验证码"
}
]
self.add_page_form(config, self.REGISTER, "手机验证码注册", items, config_data)
fix_login_page(self, event, **kwargs)
#
load(self)
#
加载插件
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def load(self):
"""加载插件
"""
super().load()
self.create_extension_config_schema()
self.register_extend_field(UserMobile, "mobile")
from api.v1.schema.auth import AuthIn
from api.v1.schema.user import UserCreateIn,UserItemOut,UserUpdateIn,UserListItemOut
from api.v1.schema.mine import ProfileSchemaOut
self.register_extend_api(
AuthIn,
UserCreateIn,
UserItemOut,
UserUpdateIn,
UserListItemOut,
mobile=(Optional[str],Field(title=_("电话号码"))),
# areacode=(str,Field(title=_("区号")))
)
self.register_extend_api(
ProfileSchemaOut,
mobile=(Optional[str],Field(readonly=True))
)
# 注册发送短信接口
self.url_send_sms_code = self.register_api(
'/config/{config_id}/send_sms_code/',
'POST',
self.send_sms_code,
tenant_path=True,
auth=None,
response=SendSMSCodeOut,
)
print(self.url_send_sms_code)
register(self, event, **kwargs)
#
注册用户
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
事件 |
required |
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
@transaction.atomic()
def register(self, event, **kwargs):
""" 注册用户
Args:
event (Event): 事件
"""
tenant = event.tenant
request = event.request
data = request.POST or json.load(request.body)
mobile = data.get('mobile')
sms_code = data.get('sms_code')
username = data.get('username')
config = self.get_current_config(event)
ret, message = self.check_mobile_exists(mobile, tenant)
if not ret:
return self.error(message)
if not check_sms_code(tenant, mobile, sms_code):
return self.error(ErrorCode.SMS_CODE_MISMATCH)
ret, message = self.check_username_exists(username, tenant)
if not ret:
return self.error(message)
user = User(tenant=tenant)
user.mobile = mobile
user.username = username
user.save()
tenant.users.add(user)
tenant.save()
return user
reset_password(self, event, **kwargs)
#
重置密码
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
事件 |
required |
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
def reset_password(self, event, **kwargs):
""" 重置密码
Args:
event (Event): 事件
"""
tenant = event.tenant
request = event.request
data = request.POST or json.load(request.body)
mobile = data.get('mobile')
sms_code = data.get('sms_code')
password = data.get('password')
checkpassword = data.get('checkpassword')
if password != checkpassword:
return self.error(ErrorCode.PASSWORD_IS_INCONSISTENT)
if not check_sms_code(tenant, mobile, sms_code):
return self.error(ErrorCode.SMS_CODE_MISMATCH)
# user = User.expand_objects.filter(tenant=tenant,mobile=mobile)
temp_users = tenant.users.all()
user_ids = []
for temp_user in temp_users:
user_ids.append(temp_user.id)
user = User.expand_objects.filter(
is_active=True,
is_del=False,
id__in=user_ids,
mobile=mobile
)
if len(user) > 1:
logger.error(f'{mobile}在数据库中匹配到多个用户')
return self.error(ErrorCode.CONTACT_MANAGER)
if user:
user = user[0]
user.password = make_password(password)
user.save()
return self.success()
return self.error(ErrorCode.MOBILE_NOT_EXISTS_ERROR)
send_sms_code(self, request, tenant_id, config_id, data)
#
发送短信验证码
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
@operation(SendSMSCodeOut)
def send_sms_code(self,request,tenant_id,config_id:str,data:SendSMSCodeIn):
"""发送短信验证码
"""
tenant = request.tenant
mobile = data.mobile
config = self.get_config_by_id(config_id)
if not config:
return self.error(ErrorCode.CONFIG_IS_NOT_EXISTS)
if not mobile or mobile=="mobile":
return self.error(ErrorCode.MOBILE_EMPTY)
code = create_sms_code(tenant,mobile,config.config.get('code_length',6),config.config.get("expired",10)*60)
responses = dispatch_event(
Event(
tag=SEND_SMS,
tenant=tenant,
request=request,
data={
"config_id":config.config["sms_config"]["id"],
"mobile":data.mobile,
"code": code,
"areacode": data.areacode,
"username": request.user.username if request.user else ""
},
packages=config.config["sms_config"]["package"]
)
)
if not responses:
return self.error(ErrorCode.SMS_EXTENSION_NOT_EXISTS)
useless, (data, extension) = responses[0]
if data:
return self.success()
else:
return self.error(ErrorCode.SMS_SEND_FAILED)
update_mine_mobile(self, request, tenant_id, data)
#
普通用户:更新手机号码
Source code in extension_root/com_longgui_auth_factor_mobile/__init__.py
@operation(UpdateMineMobileOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
def update_mine_mobile(self, request, tenant_id: str,data:UpdateMineMobileIn):
""" 普通用户:更新手机号码
"""
mobile = data.mobile
ret, message = self.check_mobile_exists(mobile, request.tenant)
if not ret:
return self.error(message)
if not check_sms_code(request.tenant,mobile,data.code):
return self.error(ErrorCode.SMS_CODE_MISMATCH)
user = request.user
user.mobile=data.mobile
user.save()
return self.success()
extension_root.com_longgui_auth_factor_password.PasswordAuthFactorExtension (AuthFactorExtension)
#
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
class PasswordAuthFactorExtension(AuthFactorExtension):
def load(self):
super().load()
self.register_extend_field(UserPassword, "password")
self.register_auth_factor_schema(PasswordAuthFactorSchema, 'password')
self.register_extend_api(AuthIn, password=str)
user_key_fields_path = self.register_api(
'/user_key_fields/',
'GET',
self.get_user_key_fields,
response=List[GetUserKeyFieldItemOut],
)
select_pw_login_fields_page.create_actions(
init_action=actions.DirectAction(
path=user_key_fields_path,
method=actions.FrontActionMethod.GET,
),
)
select_pw_register_login_fields_page.create_actions(
init_action=actions.DirectAction(
path=user_key_fields_path,
method=actions.FrontActionMethod.GET,
),
)
self.register_front_pages(select_pw_login_fields_page)
self.register_front_pages(select_pw_register_login_fields_page)
# 租户管理员:用户管理-用户列表-重置密码
reset_user_password_path = self.register_api(
'/reset_user_password/{id}/',
'POST',
self.reset_user_password,
tenant_path=True,
response=ResponseSchema,
auth=GlobalAuth()
)
user_list_page.add_local_actions(
actions.OpenAction(
name='重置密码',
path=reset_user_password_path,
method=actions.FrontActionMethod.POST,
)
)
# 初始化部分配置数据
tenant = Tenant.platform_tenant()
if not self.get_tenant_configs(tenant):
config = {
'login_enabled_field_names': [{'key':'username'}],
'register_enabled_field_names': [{'key':'username'}],
'is_apply': False,
'regular': '',
'title': '',
}
self.create_tenant_config(tenant, config, "账密登录", "password")
try:
admin_user = User.active_objects.filter(username='admin').first()
if admin_user:
admin_password = UserPassword.active_objects.filter(target=admin_user)
if not admin_password:
admin_user.password = make_password('admin')
admin_user.save()
except Exception as e:
print(e)
self.listen_event(
CREATE_TENANT,
self.create_tenant_event
)
def create_tenant_event(self,event,**kwargs):
tenant = event.tenant
config = {
'login_enabled_field_names': [{'key':'username'}],
'register_enabled_field_names': [{'key':'username'}],
'is_apply': False,
'regular': '',
'title': '',
}
self.create_tenant_config(tenant, config, "default", "password")
def check_auth_data(self, event, **kwargs):
pass
def fix_login_page(self, event, **kwargs):
pass
@operation(roles=[TENANT_ADMIN, PLATFORM_ADMIN])
def reset_user_password(self, request, tenant_id:str, id:str, data:RestUserPasswordIn):
user = User.active_objects.get(id=id)
password = data.password
user.password = make_password(password)
user.save()
return self.success()
def get_user_key_fields(self,request):
data = [{'key':key,'name':value} for key,value in User.key_fields.items()]
return data
def authenticate(self, event, **kwargs):
tenant = event.tenant
request = event.request
data = request.POST or json.load(request.body)
username = data.get('username')
password = data.get('password')
config_id = data.get('config_id')
config = TenantExtensionConfig.active_objects.get(id=config_id).config
login_enabled_field_names = [item["key"] if isinstance(item,dict) else item for item in config.get('login_enabled_field_names')]
filter_params = None
login_enabled_field_names = login_enabled_field_names or ["username"]
for lefn in login_enabled_field_names:
temp = {lefn:username}
if filter_params:
filter_params = Q(**temp) | filter_params
else:
filter_params = Q(**temp)
users = tenant.users.filter(is_del=False).filter(filter_params)
if len(users) > 1:
logger.error(f'{username}在{login_enabled_field_names}中匹配到多个用户')
return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
user = users[0] if users else None
if user:
# 对象转换
user = User.expand_objects.filter(id=user.id).first()
user_password = user.get("password")
if user_password:
if check_password(password, user_password):
user = User.valid_objects.get(id=user.get("id"))
return self.auth_success(user, event)
return self.auth_failed(event, data=self.error(ErrorCode.USERNAME_PASSWORD_MISMATCH))
@transaction.atomic()
def register(self, event, **kwargs):
tenant = event.tenant
request = event.request
data = request.POST or json.load(request.body)
username = data.get('username')
password = data.get('password')
if data.get('checkpassword',None) != password:
return self.error(ErrorCode.TWO_TIME_PASSWORD_MISMATCH)
config = self.get_current_config(event)
ret, message = self.check_password_complexity(password, config)
if not ret:
return self.error(ErrorCode.PASSWORD_STRENGTH_LACK)
register_fields = [item["key"] if isinstance(item,dict) else item for item in config.config.get('register_enabled_field_names')]
if not register_fields:
fields = ['username']
if username is None:
self.auth_failed(event, data=self.error(ErrorCode.USERNAME_EMPTY))
else:
fields = [k for k in register_fields if request.POST.get(k) is not None]
if not fields:
self.auth_failed(event, data=self.error(ErrorCode.ALL_USER_FLAG_LACK_FIELD))
for field in fields:
user = self._get_register_user(tenant, field, request.POST.get(field))
if user:
self.auth_failed(event, data=self.error(ErrorCode.FIELD_USER_EXISTS, field=field))
# user = User.objects.create(tenant=tenant)
user = User(tenant=tenant)
for k in fields:
if request.POST.get(k):
setattr(user, k, request.POST.get(k))
user.password = make_password(password)
user.save()
tenant.users.add(user)
tenant.save()
return user
def reset_password(self, event, **kwargs):
pass
def create_login_page(self, event, config, config_data):
username_placeholder = ""
for lefn in [item["key"] if isinstance(item,dict) else item for item in config.config.get('login_enabled_field_names',[])]:
if username_placeholder:
username_placeholder = ',' + User.key_fields[lefn]
else:
username_placeholder = User.key_fields[lefn]
items = [
{
"type": "text",
"name": "username",
"placeholder": username_placeholder or '用户名'
},
{
"type": "password",
"name": "password",
"placeholder": "密码"
},
]
self.add_page_form(config, self.LOGIN, "用户名密码登录", items, config_data)
def create_register_page(self, event, config, config_data):
items = []
register_fields = [item["key"] if isinstance(item,dict) else item for item in config.config.get('register_enabled_field_names')]
for rf in register_fields:
items.append({
"type": "text",
"name": rf,
"placeholder": User.key_fields[rf]
})
items.extend([
{
"type": "password",
"name": "password",
"placeholder": "密码"
},
{
"type": "password",
"name": "checkpassword",
"placeholder": "密码确认"
},
])
self.add_page_form(config, self.REGISTER, "用户名密码注册", items, config_data)
def create_password_page(self, event, config, config_data):
pass
def create_other_page(self, event, config, config_data):
pass
def check_password_complexity(self, pwd, config):
if not pwd:
return False, 'No password provide'
if config:
regular = config.config.get('regular')
title = config.config.get('title')
if re.match(regular, pwd):
return True, None
else:
return False, title
return True, None
def _get_register_user(self, tenant, field_name, field_value):
user = None
if field_name in ('username', 'email'):
user = tenant.users.filter(is_active=True, is_del=False).filter(**{field_name: field_value}).first()
else:
# 获取刚注册的用户
user = User.expand_objects.filter(**{field_name: field_value}).first()
return user
def create_auth_manage_page(self):
# 更改密码页面
mine_password_path = self.register_api(
"/mine_password/",
'POST',
self.update_mine_password,
tenant_path=True,
response=UpdateMinePasswordOut,
)
name = '更改密码'
page = pages.FormPage(name=name)
page.create_actions(
init_action=actions.ConfirmAction(
path=mine_password_path,
),
global_actions={
'confirm': actions.ConfirmAction(
path=mine_password_path
),
}
)
return page
@operation(UpdateMinePasswordOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
def update_mine_password(self,request, tenant_id: str,data:UpdateMinePasswordIn):
"""更改密码"""
user = request.user
user_expand = User.expand_objects.get(id=user.id)
user_password = user_expand["password"]
if not user_password or check_password(data.old_password, user_password):
if data.password == data.confirm_password:
user.password = make_password(data.password)
user.save()
return self.success()
else:
return self.error(ErrorCode.TWO_TIME_PASSWORD_MISMATCH)
return self.error(ErrorCode.OLD_PASSWORD_ERROR)
authenticate(self, event, **kwargs)
#
抽象方法:认证
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
认证事件 |
required |
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def authenticate(self, event, **kwargs):
tenant = event.tenant
request = event.request
data = request.POST or json.load(request.body)
username = data.get('username')
password = data.get('password')
config_id = data.get('config_id')
config = TenantExtensionConfig.active_objects.get(id=config_id).config
login_enabled_field_names = [item["key"] if isinstance(item,dict) else item for item in config.get('login_enabled_field_names')]
filter_params = None
login_enabled_field_names = login_enabled_field_names or ["username"]
for lefn in login_enabled_field_names:
temp = {lefn:username}
if filter_params:
filter_params = Q(**temp) | filter_params
else:
filter_params = Q(**temp)
users = tenant.users.filter(is_del=False).filter(filter_params)
if len(users) > 1:
logger.error(f'{username}在{login_enabled_field_names}中匹配到多个用户')
return self.auth_failed(event, data=self.error(ErrorCode.CONTACT_MANAGER))
user = users[0] if users else None
if user:
# 对象转换
user = User.expand_objects.filter(id=user.id).first()
user_password = user.get("password")
if user_password:
if check_password(password, user_password):
user = User.valid_objects.get(id=user.get("id"))
return self.auth_success(user, event)
return self.auth_failed(event, data=self.error(ErrorCode.USERNAME_PASSWORD_MISMATCH))
check_auth_data(self, event, **kwargs)
#
create_auth_manage_page(self)
#
认证管理页面描述
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_auth_manage_page(self):
# 更改密码页面
mine_password_path = self.register_api(
"/mine_password/",
'POST',
self.update_mine_password,
tenant_path=True,
response=UpdateMinePasswordOut,
)
name = '更改密码'
page = pages.FormPage(name=name)
page.create_actions(
init_action=actions.ConfirmAction(
path=mine_password_path,
),
global_actions={
'confirm': actions.ConfirmAction(
path=mine_password_path
),
}
)
return page
create_login_page(self, event, config, config_data)
#
抽象方法:组装登录页面表单
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_AUTH_FACTOR事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
config_data |
dict |
运行时配置数据 |
required |
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_login_page(self, event, config, config_data):
username_placeholder = ""
for lefn in [item["key"] if isinstance(item,dict) else item for item in config.config.get('login_enabled_field_names',[])]:
if username_placeholder:
username_placeholder = ',' + User.key_fields[lefn]
else:
username_placeholder = User.key_fields[lefn]
items = [
{
"type": "text",
"name": "username",
"placeholder": username_placeholder or '用户名'
},
{
"type": "password",
"name": "password",
"placeholder": "密码"
},
]
self.add_page_form(config, self.LOGIN, "用户名密码登录", items, config_data)
create_other_page(self, event, config, config_data)
#
抽象方法:组装登录页上其他操作表单
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_AUTH_FACTOR事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
config_data |
dict |
运行时配置数据 |
required |
create_password_page(self, event, config, config_data)
#
抽象方法:组装重置密码页面表单
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_AUTH_FACTOR事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
config_data |
dict |
运行时配置数据 |
required |
create_register_page(self, event, config, config_data)
#
抽象方法:组装注册页面表单
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_AUTH_FACTOR事件 |
required |
config |
TenantExtensionConfig |
插件运行时配置 |
required |
config_data |
dict |
运行时配置数据 |
required |
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def create_register_page(self, event, config, config_data):
items = []
register_fields = [item["key"] if isinstance(item,dict) else item for item in config.config.get('register_enabled_field_names')]
for rf in register_fields:
items.append({
"type": "text",
"name": rf,
"placeholder": User.key_fields[rf]
})
items.extend([
{
"type": "password",
"name": "password",
"placeholder": "密码"
},
{
"type": "password",
"name": "checkpassword",
"placeholder": "密码确认"
},
])
self.add_page_form(config, self.REGISTER, "用户名密码注册", items, config_data)
fix_login_page(self, event, **kwargs)
#
load(self)
#
抽象方法,插件加载的入口方法
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
def load(self):
super().load()
self.register_extend_field(UserPassword, "password")
self.register_auth_factor_schema(PasswordAuthFactorSchema, 'password')
self.register_extend_api(AuthIn, password=str)
user_key_fields_path = self.register_api(
'/user_key_fields/',
'GET',
self.get_user_key_fields,
response=List[GetUserKeyFieldItemOut],
)
select_pw_login_fields_page.create_actions(
init_action=actions.DirectAction(
path=user_key_fields_path,
method=actions.FrontActionMethod.GET,
),
)
select_pw_register_login_fields_page.create_actions(
init_action=actions.DirectAction(
path=user_key_fields_path,
method=actions.FrontActionMethod.GET,
),
)
self.register_front_pages(select_pw_login_fields_page)
self.register_front_pages(select_pw_register_login_fields_page)
# 租户管理员:用户管理-用户列表-重置密码
reset_user_password_path = self.register_api(
'/reset_user_password/{id}/',
'POST',
self.reset_user_password,
tenant_path=True,
response=ResponseSchema,
auth=GlobalAuth()
)
user_list_page.add_local_actions(
actions.OpenAction(
name='重置密码',
path=reset_user_password_path,
method=actions.FrontActionMethod.POST,
)
)
# 初始化部分配置数据
tenant = Tenant.platform_tenant()
if not self.get_tenant_configs(tenant):
config = {
'login_enabled_field_names': [{'key':'username'}],
'register_enabled_field_names': [{'key':'username'}],
'is_apply': False,
'regular': '',
'title': '',
}
self.create_tenant_config(tenant, config, "账密登录", "password")
try:
admin_user = User.active_objects.filter(username='admin').first()
if admin_user:
admin_password = UserPassword.active_objects.filter(target=admin_user)
if not admin_password:
admin_user.password = make_password('admin')
admin_user.save()
except Exception as e:
print(e)
self.listen_event(
CREATE_TENANT,
self.create_tenant_event
)
reset_password(self, event, **kwargs)
#
update_mine_password(self, request, tenant_id, data)
#
更改密码
Source code in extension_root/com_longgui_auth_factor_password/__init__.py
@operation(UpdateMinePasswordOut,roles=[TENANT_ADMIN, PLATFORM_ADMIN, NORMAL_USER])
def update_mine_password(self,request, tenant_id: str,data:UpdateMinePasswordIn):
"""更改密码"""
user = request.user
user_expand = User.expand_objects.get(id=user.id)
user_password = user_expand["password"]
if not user_password or check_password(data.old_password, user_password):
if data.password == data.confirm_password:
user.password = make_password(data.password)
user.save()
return self.success()
else:
return self.error(ErrorCode.TWO_TIME_PASSWORD_MISMATCH)
return self.error(ErrorCode.OLD_PASSWORD_ERROR)