认证规则: 认证失败次数限制#
功能介绍#
在用户超出限制认证失败次数后,对用户认证凭证表单进行扩充,插入次级认证因素,并在用户再次发起认证请求时进行次级认证因素校验
前置条件#
认证失败次数限制规则插件中需要至少一个主认证因素插件和一个次认证因素插件支持,主认证因素即为拥有登录/注册/重置密码等主要功能的认证因素,次认证因素为主认证因素通过认证规则限制对认证过程进行补充,此处以用户名密码认证因素与图形验证码认证因素为例。
配置指南#
实现思路#
- 认证规则: 认证失败次数限制:
sequenceDiagram
participant D as 用户
participant C as 平台核心
participant A as 认证失败次数限制规则插件
C->>A: 加载插件
A->>C: 注册并监听事件CREATE_LOGIN_PAGE_RULES,AUTH_FAIL,BEFORE_AUTH
D->>C: 访问注册/登录/重置密码页面
C->>A: 发出CREATE_LOGIN_PAGE_RULES事件
A->>C: 响应事件,判断是否满足规则,如满足规则即触发AUTHRULE_FIX_LOGIN_PAGE事件
C->>D: 渲染注册/登录/重置密码页面
D->>C: 输入认证凭证,发起认证请求
C->>A: 触发BEFORE_AUTH事件
A->>C: 响应事件,判断是否满足规则,如满足规则即触发AUTHRULE_CHECK_AUTH_DATA事件,检查并返回结果
C->>A: 检查结果,如未完成认证,触发AUTH_FAIL事件
A->>C: 响应事件,记录失败次数并判断是否刷新页面
C->>D: 根据返回结果渲染或刷新页面
抽象方法实现#
代码#
extension_root.com_longgui_auth_rule_retry_times.AuthRuleRetryTimesExtension (AuthRuleExtension)
#
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
class AuthRuleRetryTimesExtension(AuthRuleExtension):
def load(self):
super().load()
self.create_extension_config_schema()
self.listen_event(AUTH_FAIL,self.auth_fail)
self.listen_event(BEFORE_AUTH,self.before_auth)
self.listen_event(AUTH_SUCCESS,self.auth_success)
# 配置初始数据
tenant = Tenant.platform_tenant()
if not self.get_tenant_configs(tenant):
main_auth_factor = TenantExtensionConfig.active_objects.filter(
tenant=tenant,
extension=Extension.active_objects.filter(
package="com.longgui.auth.factor.password"
).first(),
type="password"
).first()
second_auth_factor = TenantExtensionConfig.active_objects.filter(
tenant=tenant,
extension=Extension.active_objects.filter(
package="com.longgui.auth.factor.authcode"
).first(),
type="authcode"
).first()
if main_auth_factor and second_auth_factor:
# 如主认证因素和此认证因素都存在的情况下 创建认证规则
config = {
"main_auth_factor": {
"id": main_auth_factor.id.hex,
"name": main_auth_factor.name,
"package": main_auth_factor.extension.package
},
"second_auth_factor": {
"id": second_auth_factor.id.hex,
"name": second_auth_factor.name,
"package": second_auth_factor.extension.package
},
"try_times": 3
}
self.create_tenant_config(tenant, config, "认证规则:登录失败三次启用图形验证码", "retry_times")
def before_auth(self,event,**kwargs):
""" 响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果
Args:
event: 事件
Returns:
tuple(bool,dict): 次级认证因素校验结果
"""
for config in self.get_tenant_configs(event.tenant):
if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
host = get_remote_addr(event.request)
if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)):
# 判定需要验证
responses = dispatch_event(
Event(
core_event.AUTHRULE_CHECK_AUTH_DATA,
tenant=event.tenant,
request=event.request,
packages=[
config.config["second_auth_factor"]["package"]
]
)
)
for useless,(response,useless) in responses:
if not response:
continue
result,data = response
if not result:
return response
return True,None
def auth_success(self,event,**kwargs):
# 检查是否存在满足条件的配置
for config in self.get_tenant_configs(event.tenant):
if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"].id.hex:
host = get_remote_addr(event.request)
key = self.gen_key(host,config.id.hex)
try_times = 1
cache.set(event.tenant,key,try_times,expired=config.config.get("expired",30)*60)
self.clear_refresh_status(event.tenant,host,config.id.hex)
def auth_fail(self, event, **kwargs):
"""响应事件:认证失败,记录对应IP认证失败次数
Args:
event : 事件
"""
data = event.data["data"]
# 检查是否存在满足条件的配置
for config in self.get_tenant_configs(event.tenant):
if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
host = get_remote_addr(event.request)
key = self.gen_key(host,config.id.hex)
try_times = int(cache.get(event.tenant,key) or 1)
cache.set(event.tenant,key,try_times+1,expired=config.config.get("expired",30)*60)
if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)) and not self.check_refresh_status(event.tenant,host,config.id.hex):
data.update(self.error(ErrorCode.AUTH_FAIL_TIMES_OVER_LIMITED))
self.set_refresh_status(event.tenant,host,config.id.hex)
data["refresh"] = True
def check_rule(self, event, config):
login_pages = event.data
if self.check_retry_times(event.tenant,get_remote_addr(event.request),config.id.hex,config.config.get("try_times",0)):
dispatch_event(
Event(
core_event.AUTHRULE_FIX_LOGIN_PAGE,
tenant=event.tenant,
request=event.request,
packages=[
config.config["second_auth_factor"]["package"]
],
data={
"login_pages": login_pages,
"main_auth_factor_id": config.config["main_auth_factor"]["id"],
"config_id":config.config["second_auth_factor"]["id"]
}
)
)
def check_retry_times(self,tenant,host,config_id,limited=3):
"""校验认证失败次数是否超出限制
Args:
host (str): 客户一端IP地址
config_id (str): 插件运行时ID
limited (int, optional): 认证失败次数限制. Defaults to 3.
Returns:
bool: 校验结果
"""
key=self.gen_key(host,config_id)
retry_times = int(cache.get(tenant,key) or limited)
return retry_times > limited
def set_refresh_status(self,tenant,host,config_id):
"""设置登陆页面刷新tag
Args:
host (str): 客户一端IP地址
config_id (str): 插件运行时ID
"""
cache.set(tenant,self.gen_refresh_key(host,config_id),1)
def clear_refresh_status(self,tenant,host,config_id):
cache.set(tenant,self.gen_refresh_key(host,config_id),0)
def check_refresh_status(self,tenant,host,config_id):
"""校验是否需要刷新页面
Args:
host (str): 客户一端IP地址
config_id (str): 插件运行时ID
Returns:
bool: 是否需要刷新页面
"""
return bool(cache.get(tenant,self.gen_refresh_key(host,config_id)))
def gen_refresh_key(self,host:str,config_id:str):
"""页面刷新标识KEY
Args:
host (str): 客户一端IP地址
config_id (str): 插件运行时ID
Returns:
str: 页面刷新标识KEY
"""
return f"{self.package}_cache_auth_refresh_{host}_{config_id}"
def gen_key(self,host:str,config_id:str):
"""生成记录失败次数的KEY
Args:
host (str): 客户一端IP地址
config_id (str): 插件运行时ID
Returns:
str: 记录失败次数的KEY
"""
return f"{self.package}_cache_auth_retry_times_{host}_{config_id}"
def create_extension_config_schema(self):
"""创建插件运行时schema
"""
main_auth_factor_page = pages.TablePage(select=True,name=_("选择主认证因素"))
self.register_front_pages(main_auth_factor_page)
main_auth_factor_page.create_actions(
init_action=actions.DirectAction(
path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
method=actions.FrontActionMethod.GET
)
)
second_auth_factor_page = pages.TablePage(select=True,name=_("选择次认证因素"))
self.register_front_pages(second_auth_factor_page)
second_auth_factor_page.create_actions(
init_action=actions.DirectAction(
path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
method=actions.FrontActionMethod.GET
)
)
AuthRuleRetryTimesConfigSchema = create_extension_schema(
'AuthRuleRetryTimesConfigSchema',
__file__,
[
(
'try_times',
int,
Field(
title=_('try_times', '限制重试次数'),
default=3
)
),
(
'main_auth_factor',
MainAuthRuleSchema,
Field(
title=_('main_auth_factor', '主认证因素'),
page=main_auth_factor_page.tag
)
),
(
'second_auth_factor',
SecondAuthFactorConfigSchema,
Field(
title=_('second_auth_factor', '次认证因素'),
page=second_auth_factor_page.tag
)
),
(
'expired',
Optional[int],
Field(
title=_('expired', '有效期/分钟'),
default=30,
)
),
],
base_schema=BaseAuthRuleSchema
)
self.register_auth_rule_schema(
AuthRuleRetryTimesConfigSchema,
"retry_times"
)
auth_fail(self, event, **kwargs)
#
响应事件:认证失败,记录对应IP认证失败次数
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
事件 |
required |
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def auth_fail(self, event, **kwargs):
"""响应事件:认证失败,记录对应IP认证失败次数
Args:
event : 事件
"""
data = event.data["data"]
# 检查是否存在满足条件的配置
for config in self.get_tenant_configs(event.tenant):
if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
host = get_remote_addr(event.request)
key = self.gen_key(host,config.id.hex)
try_times = int(cache.get(event.tenant,key) or 1)
cache.set(event.tenant,key,try_times+1,expired=config.config.get("expired",30)*60)
if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)) and not self.check_refresh_status(event.tenant,host,config.id.hex):
data.update(self.error(ErrorCode.AUTH_FAIL_TIMES_OVER_LIMITED))
self.set_refresh_status(event.tenant,host,config.id.hex)
data["refresh"] = True
before_auth(self, event, **kwargs)
#
响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
事件 |
required |
Returns:
Type | Description |
---|---|
tuple(bool,dict) |
次级认证因素校验结果 |
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def before_auth(self,event,**kwargs):
""" 响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果
Args:
event: 事件
Returns:
tuple(bool,dict): 次级认证因素校验结果
"""
for config in self.get_tenant_configs(event.tenant):
if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
host = get_remote_addr(event.request)
if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)):
# 判定需要验证
responses = dispatch_event(
Event(
core_event.AUTHRULE_CHECK_AUTH_DATA,
tenant=event.tenant,
request=event.request,
packages=[
config.config["second_auth_factor"]["package"]
]
)
)
for useless,(response,useless) in responses:
if not response:
continue
result,data = response
if not result:
return response
return True,None
check_refresh_status(self, tenant, host, config_id)
#
校验是否需要刷新页面
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
str |
客户一端IP地址 |
required |
config_id |
str |
插件运行时ID |
required |
Returns:
Type | Description |
---|---|
bool |
是否需要刷新页面 |
check_retry_times(self, tenant, host, config_id, limited=3)
#
校验认证失败次数是否超出限制
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
str |
客户一端IP地址 |
required |
config_id |
str |
插件运行时ID |
required |
limited |
int |
认证失败次数限制. Defaults to 3. |
3 |
Returns:
Type | Description |
---|---|
bool |
校验结果 |
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def check_retry_times(self,tenant,host,config_id,limited=3):
"""校验认证失败次数是否超出限制
Args:
host (str): 客户一端IP地址
config_id (str): 插件运行时ID
limited (int, optional): 认证失败次数限制. Defaults to 3.
Returns:
bool: 校验结果
"""
key=self.gen_key(host,config_id)
retry_times = int(cache.get(tenant,key) or limited)
return retry_times > limited
check_rule(self, event, config)
#
抽象方法:校验规则
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Event |
CREATE_LOGIN_PAGE_RULES事件 |
required |
config |
TenantExtensionConfig |
运行时配置 |
required |
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def check_rule(self, event, config):
login_pages = event.data
if self.check_retry_times(event.tenant,get_remote_addr(event.request),config.id.hex,config.config.get("try_times",0)):
dispatch_event(
Event(
core_event.AUTHRULE_FIX_LOGIN_PAGE,
tenant=event.tenant,
request=event.request,
packages=[
config.config["second_auth_factor"]["package"]
],
data={
"login_pages": login_pages,
"main_auth_factor_id": config.config["main_auth_factor"]["id"],
"config_id":config.config["second_auth_factor"]["id"]
}
)
)
create_extension_config_schema(self)
#
创建插件运行时schema
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def create_extension_config_schema(self):
"""创建插件运行时schema
"""
main_auth_factor_page = pages.TablePage(select=True,name=_("选择主认证因素"))
self.register_front_pages(main_auth_factor_page)
main_auth_factor_page.create_actions(
init_action=actions.DirectAction(
path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
method=actions.FrontActionMethod.GET
)
)
second_auth_factor_page = pages.TablePage(select=True,name=_("选择次认证因素"))
self.register_front_pages(second_auth_factor_page)
second_auth_factor_page.create_actions(
init_action=actions.DirectAction(
path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
method=actions.FrontActionMethod.GET
)
)
AuthRuleRetryTimesConfigSchema = create_extension_schema(
'AuthRuleRetryTimesConfigSchema',
__file__,
[
(
'try_times',
int,
Field(
title=_('try_times', '限制重试次数'),
default=3
)
),
(
'main_auth_factor',
MainAuthRuleSchema,
Field(
title=_('main_auth_factor', '主认证因素'),
page=main_auth_factor_page.tag
)
),
(
'second_auth_factor',
SecondAuthFactorConfigSchema,
Field(
title=_('second_auth_factor', '次认证因素'),
page=second_auth_factor_page.tag
)
),
(
'expired',
Optional[int],
Field(
title=_('expired', '有效期/分钟'),
default=30,
)
),
],
base_schema=BaseAuthRuleSchema
)
self.register_auth_rule_schema(
AuthRuleRetryTimesConfigSchema,
"retry_times"
)
gen_key(self, host, config_id)
#
生成记录失败次数的KEY
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
str |
客户一端IP地址 |
required |
config_id |
str |
插件运行时ID |
required |
Returns:
Type | Description |
---|---|
str |
记录失败次数的KEY |
gen_refresh_key(self, host, config_id)
#
页面刷新标识KEY
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
str |
客户一端IP地址 |
required |
config_id |
str |
插件运行时ID |
required |
Returns:
Type | Description |
---|---|
str |
页面刷新标识KEY |
load(self)
#
抽象方法,插件加载的入口方法
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def load(self):
super().load()
self.create_extension_config_schema()
self.listen_event(AUTH_FAIL,self.auth_fail)
self.listen_event(BEFORE_AUTH,self.before_auth)
self.listen_event(AUTH_SUCCESS,self.auth_success)
# 配置初始数据
tenant = Tenant.platform_tenant()
if not self.get_tenant_configs(tenant):
main_auth_factor = TenantExtensionConfig.active_objects.filter(
tenant=tenant,
extension=Extension.active_objects.filter(
package="com.longgui.auth.factor.password"
).first(),
type="password"
).first()
second_auth_factor = TenantExtensionConfig.active_objects.filter(
tenant=tenant,
extension=Extension.active_objects.filter(
package="com.longgui.auth.factor.authcode"
).first(),
type="authcode"
).first()
if main_auth_factor and second_auth_factor:
# 如主认证因素和此认证因素都存在的情况下 创建认证规则
config = {
"main_auth_factor": {
"id": main_auth_factor.id.hex,
"name": main_auth_factor.name,
"package": main_auth_factor.extension.package
},
"second_auth_factor": {
"id": second_auth_factor.id.hex,
"name": second_auth_factor.name,
"package": second_auth_factor.extension.package
},
"try_times": 3
}
self.create_tenant_config(tenant, config, "认证规则:登录失败三次启用图形验证码", "retry_times")
set_refresh_status(self, tenant, host, config_id)
#
设置登陆页面刷新tag
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
str |
客户一端IP地址 |
required |
config_id |
str |
插件运行时ID |
required |