应用协议
功能介绍#
应用协议,其它类型的插件可以通过继承协议基类的方式,获得基类的方法,方便插件载入
实现思路#
第一步,创建一个新的类,继承AppProtocolExtension这个基类
第二步,要重载基类的load方法
- 需要将应用的schema使用register_app_protocol_schema方法加载进去
- 可选,如果应用需要使用由基类实现的应用入口认证方法,需要调用register_enter_view方法
第三步,实现基类中规定的抽象方法
抽象方法#
基类定义#
arkid.core.extension.app_protocol.AppProtocolExtension (Extension)
#
Source code in arkid/core/extension/app_protocol.py
class AppProtocolExtension(Extension):
TYPE = "app_protocol"
composite_schema_map = {}
created_composite_schema_list = []
composite_key = 'app_type'
composite_model = App
@property
def type(self):
return AppProtocolExtension.TYPE
def load(self):
super().load()
self.listen_event(core_event.CREATE_APP_CONFIG, self.create_app)
self.listen_event(core_event.UPDATE_APP_CONFIG, self.update_app)
self.listen_event(core_event.DELETE_APP, self.delete_app)
def register_app_protocol_schema(self, schema, app_type):
"""
注册应用的schema
Params:
schema: schema
app_type: 应用类型
"""
self.register_config_schema(schema, self.package + '_' + app_type)
self.register_composite_config_schema(schema, app_type, exclude=['secret'])
@abstractmethod
def create_app(self, event, **kwargs):
"""
抽象方法,创建应用
Params:
event: 事件参数
kwargs: 其它方法参数
Return:
bool: 是否成功执行
"""
pass
@abstractmethod
def update_app(self, event, **kwargs):
"""
抽象方法,修改应用
Params:
event: 事件参数
kwargs: 其它方法参数
Return:
bool: 是否成功执行
"""
pass
@abstractmethod
def delete_app(self, event, **kwargs):
"""
抽象方法,删除应用
Params:
event: 事件参数
kwargs: 其它方法参数
Return:
bool: 是否成功执行
"""
pass
def register_enter_view(self, view:View, path:str, url_name:str, type:list, tenant_urls: bool=True):
'''
注册统一的入口函数,方便检测
Params:
view: str 目标View的as_view(),例如:AuthorizationView.as_view()
path: str 需要跳转的路径,例如:r"app/(?P<app_id>[\w-]+)/oauth/authorize/$
url_name: str 注册的路径名称, 例如:authorize
type: list 一个当前插件的类型list, 例如:['OIDC', 'OAuth2']
tenant_urls: bool 是否注册为租户url
Return:
response: 函数执行结果
'''
# 入口函数
class EnterView(View):
def get(self, request, **kwargs):
from arkid.core.perm.permission_data import PermissionData
permissiondata = PermissionData()
result, alert = permissiondata.check_app_entry_permission(request, type, kwargs)
if result:
return view(request)
else:
url = self.get_login_url(request, alert)
return HttpResponseRedirect(url)
def get_login_url(self, request, alert):
from arkid.config import get_app_config
full_path = request.get_full_path()
next_uri = urllib.parse.quote(full_path)
host = get_app_config().get_frontend_host()
tenant = request.tenant
if not tenant:
return f'{host}{LOGIN_URL}?tenant_id=&next={next_uri}'
if tenant.is_platform_tenant and tenant.id.hex not in request.get_full_path() and \
str(tenant.id) not in request.get_full_path():
return f'{host}{LOGIN_URL}?tenant_id=&next={next_uri}'
token = request.GET.get('token', '')
if not token:
tenant_expand = Tenant.expand_objects.get(id=tenant.id)
if tenant_expand.get('login_url'):
return f"{tenant_expand['login_url']}?tenant_id={tenant.id}&next={next_uri}"
backend_host = get_app_config().get_host()
backend_login_url = '/api/v1/login'
return f"{backend_host}{backend_login_url}?tenant_id={tenant.id}&next={next_uri}"
# if tenant.slug:
# host =get_app_config().get_slug_frontend_host(tenant.slug)
# return f'{host}{LOGIN_URL}?&next={next_uri}'
# else:
# return f'{host}{LOGIN_URL}?tenant_id={tenant.id}&next={next_uri}'
if tenant.slug:
host =get_app_config().get_slug_frontend_host(tenant.slug)
return f'{host}{LOGIN_URL}?alert={alert}&next={next_uri}'
else:
return f'{host}{LOGIN_URL}?tenant_id={tenant.id}&alert={alert}&next={next_uri}'
def post(self, request, **kwargs):
from arkid.core.perm.permission_data import PermissionData
permissiondata = PermissionData()
result, alert = permissiondata.check_app_entry_permission(request, type, kwargs)
if result:
return view(request)
else:
url = self.get_login_url(request, alert)
return HttpResponseRedirect(url)
# 获取进入的路由
entry_url = [re_path(path, EnterView.as_view(), name=url_name)]
# 注册入口路由
self.register_routers(entry_url, tenant_urls)
composite_model (BaseModel, ExpandModel)
django-model
#
App(id, is_del, is_active, updated, created, tenant, name, url, logo, description, type, secret, config, package, entry_permission, arkstore_category_id, arkstore_app_id, skip_token_verification)
Source code in arkid/core/extension/app_protocol.py
class App(BaseModel, ExpandModel):
class Meta(object):
verbose_name = _("APP", "应用")
verbose_name_plural = _("APP", "应用")
tenant = models.ForeignKey('Tenant', blank=False, on_delete=models.PROTECT)
name = models.CharField(max_length=128, verbose_name=_('name', '名称'))
url = models.CharField(
max_length=1024, null=True, blank=True, verbose_name=_('url', '地址')
)
logo = models.CharField(
max_length=1024, blank=True, null=True, default='', verbose_name=_('logo', '图标')
)
description = models.TextField(
blank=True, null=True, verbose_name=_('description', '描述')
)
type = models.CharField(max_length=128, default='', verbose_name=_('type', '类型'))
secret = models.CharField(
max_length=255,
blank=True,
null=True,
default='',
verbose_name=_('secret', '密钥'),
)
config = models.OneToOneField(
TenantExtensionConfig,
blank=True,
null=True,
default=None,
on_delete=models.PROTECT,
)
package = models.CharField(
max_length=128,
blank=True,
null=True,
default='',
verbose_name=_('package', '包名'),
)
entry_permission = models.ForeignKey(
'SystemPermission',
blank=True,
null=True,
default=None,
on_delete=models.PROTECT,
)
arkstore_category_id = models.IntegerField(
default=None, null=True, verbose_name=_('ArkStore分类ID')
)
arkstore_app_id = models.CharField(
max_length=1024,
blank=True,
null=True,
default=None,
verbose_name=_('Arkstore app id', '方舟商店应用标识'),
)
skip_token_verification = models.BooleanField(
default=False, verbose_name=_('Skip Token Verification', '应用入口地址跳过验证')
)
def __str__(self) -> str:
return f'Tenant: {self.tenant.name}, App: {self.name}'
arkstore_app_id: CharField
blank
django-field
nullable
#
Arkstore app id
arkstore_category_id: IntegerField
django-field
nullable
#
ArkStore分类ID
config: OneToOneField
blank
django-field
nullable
#
config
created: DateTimeField
blank
django-field
nullable
#
创建时间
description: TextField
blank
django-field
nullable
#
description
entry_permission: ForeignKey
blank
django-field
nullable
#
entry permission
id: UUIDField
django-field
#
ID
is_active: BooleanField
django-field
#
是否可用
is_del: BooleanField
django-field
#
是否删除
logo: CharField
blank
django-field
nullable
#
logo
name: CharField
django-field
#
name
package: CharField
blank
django-field
nullable
#
package
secret: CharField
blank
django-field
nullable
#
secret
skip_token_verification: BooleanField
django-field
#
Skip Token Verification
tenant: ForeignKey
django-field
#
tenant
type: CharField
django-field
#
type
updated: DateTimeField
blank
django-field
nullable
#
更新时间
url: CharField
blank
django-field
nullable
#
url
create_app(self, event, **kwargs)
#
抽象方法,创建应用
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
事件参数 |
required | |
kwargs |
其它方法参数 |
{} |
Returns:
Type | Description |
---|---|
bool |
是否成功执行 |
delete_app(self, event, **kwargs)
#
抽象方法,删除应用
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
事件参数 |
required | |
kwargs |
其它方法参数 |
{} |
Returns:
Type | Description |
---|---|
bool |
是否成功执行 |
load(self)
#
register_app_protocol_schema(self, schema, app_type)
#
注册应用的schema
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schema |
schema |
required | |
app_type |
应用类型 |
required |
Source code in arkid/core/extension/app_protocol.py
register_enter_view(self, view, path, url_name, type, tenant_urls=True)
#
注册统一的入口函数,方便检测
Parameters:
Name | Type | Description | Default |
---|---|---|---|
view |
View |
str 目标View的as_view(),例如:AuthorizationView.as_view() |
required |
path |
str |
str 需要跳转的路径,例如:r"app/(?P |
required |
url_name |
str |
str 注册的路径名称, 例如:authorize |
required |
type |
list |
list 一个当前插件的类型list, 例如:['OIDC', 'OAuth2'] |
required |
tenant_urls |
bool |
bool 是否注册为租户url |
True |
Returns:
Type | Description |
---|---|
response |
函数执行结果 |
Source code in arkid/core/extension/app_protocol.py
def register_enter_view(self, view:View, path:str, url_name:str, type:list, tenant_urls: bool=True):
'''
注册统一的入口函数,方便检测
Params:
view: str 目标View的as_view(),例如:AuthorizationView.as_view()
path: str 需要跳转的路径,例如:r"app/(?P<app_id>[\w-]+)/oauth/authorize/$
url_name: str 注册的路径名称, 例如:authorize
type: list 一个当前插件的类型list, 例如:['OIDC', 'OAuth2']
tenant_urls: bool 是否注册为租户url
Return:
response: 函数执行结果
'''
# 入口函数
class EnterView(View):
def get(self, request, **kwargs):
from arkid.core.perm.permission_data import PermissionData
permissiondata = PermissionData()
result, alert = permissiondata.check_app_entry_permission(request, type, kwargs)
if result:
return view(request)
else:
url = self.get_login_url(request, alert)
return HttpResponseRedirect(url)
def get_login_url(self, request, alert):
from arkid.config import get_app_config
full_path = request.get_full_path()
next_uri = urllib.parse.quote(full_path)
host = get_app_config().get_frontend_host()
tenant = request.tenant
if not tenant:
return f'{host}{LOGIN_URL}?tenant_id=&next={next_uri}'
if tenant.is_platform_tenant and tenant.id.hex not in request.get_full_path() and \
str(tenant.id) not in request.get_full_path():
return f'{host}{LOGIN_URL}?tenant_id=&next={next_uri}'
token = request.GET.get('token', '')
if not token:
tenant_expand = Tenant.expand_objects.get(id=tenant.id)
if tenant_expand.get('login_url'):
return f"{tenant_expand['login_url']}?tenant_id={tenant.id}&next={next_uri}"
backend_host = get_app_config().get_host()
backend_login_url = '/api/v1/login'
return f"{backend_host}{backend_login_url}?tenant_id={tenant.id}&next={next_uri}"
# if tenant.slug:
# host =get_app_config().get_slug_frontend_host(tenant.slug)
# return f'{host}{LOGIN_URL}?&next={next_uri}'
# else:
# return f'{host}{LOGIN_URL}?tenant_id={tenant.id}&next={next_uri}'
if tenant.slug:
host =get_app_config().get_slug_frontend_host(tenant.slug)
return f'{host}{LOGIN_URL}?alert={alert}&next={next_uri}'
else:
return f'{host}{LOGIN_URL}?tenant_id={tenant.id}&alert={alert}&next={next_uri}'
def post(self, request, **kwargs):
from arkid.core.perm.permission_data import PermissionData
permissiondata = PermissionData()
result, alert = permissiondata.check_app_entry_permission(request, type, kwargs)
if result:
return view(request)
else:
url = self.get_login_url(request, alert)
return HttpResponseRedirect(url)
# 获取进入的路由
entry_url = [re_path(path, EnterView.as_view(), name=url_name)]
# 注册入口路由
self.register_routers(entry_url, tenant_urls)
update_app(self, event, **kwargs)
#
抽象方法,修改应用
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
事件参数 |
required | |
kwargs |
其它方法参数 |
{} |
Returns:
Type | Description |
---|---|
bool |
是否成功执行 |
示例#
extension_root.com_longgui_app_protocol_oidc.OAuth2ServerExtension (AppProtocolExtension)
#
Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
class OAuth2ServerExtension(AppProtocolExtension):
def load(self):
# 加载url地址
self.load_urls()
# 加载相应的view
self.load_auth_view()
# 加载相应的配置文件
if not settings.IS_CENTRAL_ARKID:
self.register_app_protocol_schema(OIDCConfigSchema, 'OIDC')
self.register_app_protocol_schema(Oauth2ConfigSchema, 'OAuth2')
super().load()
def load_urls(self):
self.register_routers(urls, True)
def load_auth_view(self):
# 加载认证view
auth_view = AuthorizationView.as_view()
auth_path = r"app/(?P<app_id>[\w-]+)/oauth/authorize/$"
url_name = "authorize"
type = ['OIDC', 'OAuth2']
self.register_enter_view(auth_view, auth_path, url_name, type)
def create_app(self, event, **kwargs):
config = event.data["config"]
return self.update_app_data(event, config, True)
def update_app(self, event, **kwargs):
config = event.data["config"]
return self.update_app_data(event, config, False)
def delete_app(self, event, **kwargs):
Application.objects.filter(uuid=event.data.id).delete()
return True
def update_app_data(self, event, config, is_create):
'''
修改应用程序
'''
app = event.data["app"]
tenant = event.tenant
client_type = config["client_type"]
redirect_uris = config["redirect_uris"]
grant_type = config["grant_type"]
skip_authorization = config["skip_authorization"]
app_type = event.data.get("app_type")
algorithm = config.get("algorithm",None)
obj,iscreated = Application.objects.get_or_create(uuid=app.id)
obj.name = app.name
obj.client_type = client_type
obj.redirect_uris = redirect_uris
obj.skip_authorization = skip_authorization
obj.authorization_grant_type = grant_type
if algorithm and app_type == 'OIDC':
obj.algorithm = algorithm
obj.save()
# 更新地址信息
self.update_url_data(tenant.id, config, obj)
return True
def update_url_data(self, tenant_id, config, obj):
'''
更新配置中的url信息
'''
host = get_app_config().get_frontend_host()
namespace = f'api:{self.pname}_tenant'
config["userinfo"] = host+reverse(namespace+":oauth-user-info", args=[tenant_id])
config["authorize"] = host+reverse(namespace+":authorize", args=[tenant_id, obj.uuid])
config["token"] = host+reverse(namespace+":token", args=[tenant_id])
config["logout"] = host+reverse(namespace+":oauth-user-logout", args=[tenant_id])
config["issuer_url"] = "{}/api/v1/tenant/{}/app/{}".format(host,tenant_id,obj.uuid)
config["client_id"] = obj.client_id
config["client_secret"] = obj.client_secret
config["skip_authorization"] = obj.skip_authorization
create_app(self, event, **kwargs)
#
抽象方法,创建应用
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
事件参数 |
required | |
kwargs |
其它方法参数 |
{} |
Returns:
Type | Description |
---|---|
bool |
是否成功执行 |
delete_app(self, event, **kwargs)
#
抽象方法,删除应用
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
事件参数 |
required | |
kwargs |
其它方法参数 |
{} |
Returns:
Type | Description |
---|---|
bool |
是否成功执行 |
load(self)
#
抽象方法,插件加载的入口方法
Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
update_app(self, event, **kwargs)
#
抽象方法,修改应用
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
事件参数 |
required | |
kwargs |
其它方法参数 |
{} |
Returns:
Type | Description |
---|---|
bool |
是否成功执行 |
update_app_data(self, event, config, is_create)
#
修改应用程序
Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
def update_app_data(self, event, config, is_create):
'''
修改应用程序
'''
app = event.data["app"]
tenant = event.tenant
client_type = config["client_type"]
redirect_uris = config["redirect_uris"]
grant_type = config["grant_type"]
skip_authorization = config["skip_authorization"]
app_type = event.data.get("app_type")
algorithm = config.get("algorithm",None)
obj,iscreated = Application.objects.get_or_create(uuid=app.id)
obj.name = app.name
obj.client_type = client_type
obj.redirect_uris = redirect_uris
obj.skip_authorization = skip_authorization
obj.authorization_grant_type = grant_type
if algorithm and app_type == 'OIDC':
obj.algorithm = algorithm
obj.save()
# 更新地址信息
self.update_url_data(tenant.id, config, obj)
return True
update_url_data(self, tenant_id, config, obj)
#
更新配置中的url信息
Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
def update_url_data(self, tenant_id, config, obj):
'''
更新配置中的url信息
'''
host = get_app_config().get_frontend_host()
namespace = f'api:{self.pname}_tenant'
config["userinfo"] = host+reverse(namespace+":oauth-user-info", args=[tenant_id])
config["authorize"] = host+reverse(namespace+":authorize", args=[tenant_id, obj.uuid])
config["token"] = host+reverse(namespace+":token", args=[tenant_id])
config["logout"] = host+reverse(namespace+":oauth-user-logout", args=[tenant_id])
config["issuer_url"] = "{}/api/v1/tenant/{}/app/{}".format(host,tenant_id,obj.uuid)
config["client_id"] = obj.client_id
config["client_secret"] = obj.client_secret
config["skip_authorization"] = obj.skip_authorization