Skip to content

第三方登录

功能介绍#

第三方登录,实际上就是用其它IDP系统账号登录,并与ArkID的账号进行绑定的过程。

经典的有:微信登录,钉钉登录,飞书登录等等。

实现思路#

由于大部分的第三方登录方案都是基于OAuth2协议的衍生品,因此三方认证的流程以OAuth2为参考。

首先,在登录页面创建一个第三方认证的入口,生成该入口需要:

当用户点击图标的时候,发起第三方认证请求后, 会回调到arkid.core.extension.extrnal_idp.ExternalIdpExtension.callback接口,并携带code

在CallBack中,用code调用get_ext_token_by_code方法,获取access_token, 然后通过access_token调用get_user_info_by_ext_token方法获取用户信息

至此即完成登录

抽象方法#

基类定义#

arkid.core.extension.external_idp.ExternalIdpExtension (Extension) #

Source code in arkid/core/extension/external_idp.py
class ExternalIdpExtension(Extension):
    TYPE = "external_idp"

    composite_schema_map = {}
    created_composite_schema_list = []
    composite_key = 'type'
    composite_model = TenantExtensionConfig

    @property
    def type(self):
        return ExternalIdpExtension.TYPE

    def load(self):

        urls = [
            re_path(
                rf'^idp/{self.pname}/(?P<config_id>[\w-]+)/login$',
                self.login,
                name=f'{self.pname}_login',
            ),
            re_path(
                rf'^idp/{self.pname}/(?P<config_id>[\w-]+)/callback$',
                self.callback,
                name=f'{self.pname}_callback',
            ),
            re_path(
                rf'^idp/{self.pname}/(?P<config_id>[\w-]+)/bind$',
                self.bind,
                name=f'{self.pname}_bind',
            ),
        ]
        self.register_routers(urls, False)
        self.listen_event(
            core_event.CREATE_LOGIN_PAGE_AUTH_FACTOR, self.add_idp_login_buttons
        )
        self.listen_event(
            core_event.ACCOUNT_UNBIND, self.account_unbind
        )
        super().load()

    @abstractmethod
    def get_authorize_url(self, config, callback_url, next_url):
        """
        抽象方法
        Args:
            config (arkid.extension.models.TenantExtensionConfig): 第三方认证提供的Client_ID,
            callback_url (str): 由ArkID提供的回调地址
            next_url (str): 前端传来的跳转地址
        Returns:
            str: 第三方登录提供的认证URL
        """
        pass

    def login(self, request, config_id):
        """
        重定向到第三方登录的入口地址, 该入口地址由get_authorize_url提供
        """
        config = self.get_config_by_id(config_id)
        if not config:
            return JsonResponse({"error_msg": "没有找到登录配置"})
        callback_url = config.config.get("callback_url")
        # callback_url = callback_url.replace(
        #     "localhost:8000", "xxxx.vaiwan.com"
        # )  # 内网穿透测试用
        next_url = request.GET.get("next", None)
        if next_url is not None:
            next_url = "?next=" + next_url
        else:
            next_url = ""
        url = self.get_authorize_url(config, callback_url, next_url)

        return HttpResponseRedirect(url)

    @abstractmethod
    def get_ext_token(self, config, code):
        """
        抽象方法
        Args:
            code (str): 第三方认证返回的code
            config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
        Returns:
            str: 返回第三方认证提供的token
        """
        pass

    @abstractmethod
    def get_ext_user_info(self, config, code, token):
        """
        抽象方法
        Args:
            config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
            code (str): 第三方认证返回的code
            token (str): 第三方认证返回的token
        Returns:
            dict: 返回第三方认证提供的用户信息
        """
        pass

    @abstractmethod
    def get_arkid_user(self, ext_id):
        """
        抽象方法
        Args:
            ext_id (str): 第三方认证返回的用户标识
        Returns:
            arkid.core.models.User: ArkID用户
        """
        pass

    def get_arkid_token(self, ext_id, ext_name, ext_icon, config):
        arkid_user = self.get_arkid_user(ext_id)
        if arkid_user:
            from arkid.core.models import ExpiringToken
            et = ExpiringToken.objects.filter(
                user=arkid_user
            ).first()
            if et and et.expired(config.tenant) is False:
                token = et.token
            else:
                token = refresh_token(arkid_user)
            context = {"token": token}
        else:
            context = {
                "token": "",
                "ext_id": ext_id,
                "ext_name": ext_name,
                "ext_icon": ext_icon,
                "tenant_id": config.tenant.id.hex,
                "bind": config.config.get('bind_url'),
            }

        return context

    @abstractmethod
    def get_auth_code_from_request(self, request):
        """
        抽象方法
        Args:
            request (HTTPRequest): 第三方认证返回的用户标识
        Returns:
            str: 授权码
        """
        pass

    def callback(self, request, config_id):
        """
        拿到请求中携带的code,调用get_ext_token_by_code获取第三方认证的token,
        调用get_user_info_by_ext_token获取第三方认证提供的用户信息,
        拿到ext_id后,判断该ext_id是否已经和ArkID中的用户绑定,如果绑定直接返回绑定用户的Token,
        如果没有,返回重定向到前端绑定页面
        """
        code = self.get_auth_code_from_request(request)
        next_url = request.GET.get("next", '')
        config = self.get_config_by_id(config_id)
        frontend_host = (
            get_app_config()
            .get_frontend_host()
            .replace('http://', '')
            .replace('https://', '')
        )
        # if next_url and (
        #     "third_part_callback" not in next_url or frontend_host not in next_url
        # ):
        #     return JsonResponse({'error_msg': '错误的跳转页面'})
        if next_url and (
            frontend_host not in next_url
        ):
            return JsonResponse({'error_msg': '错误的跳转页面'})
        if code:
            try:
                ext_token = self.get_ext_token(config, code)
                ext_id, ext_name, ext_icon, ext_info = self.get_ext_user_info(
                    config, code, ext_token
                )
            except Exception as e:
                logger.error(e)
                args = e.args
                if len(args) == 1 and isinstance(args[0], dict):
                    return JsonResponse(e.args[0], json_dumps_params={'ensure_ascii': False})
                else:
                    return JsonResponse({"error_msg": str(e)}, json_dumps_params={'ensure_ascii': False})
        else:
            return JsonResponse({"error_msg": "授权码丢失", "code": ["required"]}, json_dumps_params={'ensure_ascii': False})

        context = self.get_arkid_token(ext_id, ext_name, ext_icon, config)
        query_string = urlencode(context)
        if next_url:
            url = f"{next_url}?{query_string}"
            url = unquote(url)
            return HttpResponseRedirect(url)
        else:
            frontend_host = get_app_config().get_frontend_host()
            frontend_callback = f'{frontend_host}/third_part_callback'
            url = f"{frontend_callback}?{query_string}"
            url = unquote(url)
            return HttpResponseRedirect(url)

    @abstractmethod
    def bind_arkid_user(self, ext_id, user, data):
        """
        Args:
            ext_id (str): 第三方登录返回的用户标识
            user (arkid.core.models.User): ArkID的用户
            data (dict) request数据
        Returns:
            {"token":xxx}: 返回token
        """
        pass

    @csrf_exempt
    def bind(self, request, config_id):
        """
        处理第三方身份源返回的user_id和ArkID的user之间的绑定
        """
        from urllib.parse import unquote
        from arkid.core.event import SAVE_FILE, dispatch_event, Event
        from arkid.extension.models import Extension

        ext_id = request.POST.get("ext_id")
        user = verify_token(request)
        if not user:
            return JsonResponse({"error_msg": "Token验证失败", "code": ["token invalid"]})
        ext_icon = request.POST.get('ext_icon', '')
        ext_name = request.POST.get('ext_name', '')
        config = self.get_config_by_id(config_id)
        extension = Extension.active_objects.filter(
            type="storage"
        ).first()
        if ext_name:
            ext_name = unquote(ext_name)
            request.POST['ext_name'] = ext_name
        if ext_icon:
            data = {
                'fileurl': ext_icon
            }
            responses = dispatch_event(Event(tag=SAVE_FILE, tenant=config.tenant, request=request, packages=extension.package, data=data))
            useless, (fileinfo, extension) = responses[0]
            request.POST['ext_icon'] = fileinfo
        self.bind_arkid_user(ext_id, user, request.POST)
        # token = refresh_token(user)
        # data = {"token": token}
        data = {}
        return JsonResponse(self.success())

    @abstractmethod
    def get_img_url(self):
        """
        抽象方法

        Returns:
            url str: 返回第三方登录按钮的图标
        """
        pass

    def register_external_idp_schema(self, idp_type, schema):
        self.register_config_schema(schema, self.package + '_' + idp_type)
        self.register_composite_config_schema(schema, idp_type, exclude=['extension'])

    def create_tenant_config(self, tenant, config, name, type):
        config_created = super().create_tenant_config(tenant, config, name, type)
        server_host = get_app_config().get_host()
        login_url = server_host + reverse(
            f'api:{self.pname}:{self.pname}_login',
            args=[config_created.id],
        )
        callback_url = server_host + reverse(
            f'api:{self.pname}:{self.pname}_callback',
            args=[config_created.id],
        )
        bind_url = server_host + reverse(
            f'api:{self.pname}:{self.pname}_bind',
            args=[config_created.id],
        )
        # img_url = self.get_img_url()
        config["login_url"] = login_url
        config["callback_url"] = callback_url
        config["bind_url"] = bind_url
        # config["img_url"] = img_url
        config_created.config = config
        config_created.save()
        return config_created

    def update_tenant_config(self, id, config, name, type):
        super().update_tenant_config(id, config, name, type)
        config_created = TenantExtensionConfig.valid_objects.filter(id=id).first()
        server_host = get_app_config().get_host()
        login_url = server_host + reverse(
            f'api:{self.pname}:{self.pname}_login',
            args=[config_created.id],
        )
        callback_url = server_host + reverse(
            f'api:{self.pname}:{self.pname}_callback',
            args=[config_created.id],
        )
        bind_url = server_host + reverse(
            f'api:{self.pname}:{self.pname}_bind',
            args=[config_created.id],
        )
        # img_url = self.get_img_url()
        config["login_url"] = login_url
        config["callback_url"] = callback_url
        config["bind_url"] = bind_url
        # config["img_url"] = img_url
        config_created.config = config
        config_created.save()
        return config_created

    def add_idp_login_buttons(self, event, **kwargs):
        logger.info(f'{self.package} add idp login buttons start')
        data = {}
        configs = self.get_tenant_configs(event.tenant)
        for config in configs:
            img_url, redirect_url = self.get_img_and_redirect_url(config)
            if img_url and redirect_url:
                buttons = [{"img": config.config.get('img_url', img_url), "redirect": {"url": redirect_url}, "tooltip":config.name}]
                data[config.id.hex] = {"login": {'extend': {"buttons": buttons}}}

        logger.info(f'{self.package} add idp login buttions end')
        return data

    def account_unbind(self, event, **kwargs):
        '''
        在账户解绑的时候会调用此方法,开发者可以根据需要重写此方法
        Params:
            event: 事件参数
                data: 数据
                    user_id: 用户id
        '''
        data = event.data
        pass

    @abstractmethod
    def get_img_and_redirect_url(self, config):
        """
        返回前端渲染第三方登录的按钮
        抽象方法
        Args:
            config (arkid.extension.models.TenantExtensionConfig): 第三方认证提供的Client_ID,

        Returns:
            tuple: 返回图片url和跳转地址('image_url', 'redirect_url')
        """
        pass

composite_model (BaseModel) django-model #

TenantExtensionConfig(id, is_del, is_active, updated, created, tenant, extension, config, name, type)

Source code in arkid/core/extension/external_idp.py
class TenantExtensionConfig(BaseModel):

    class Meta(object):
        verbose_name = _("插件运行时配置")
        verbose_name_plural = _("插件运行时配置")

    tenant = models.ForeignKey('core.Tenant', blank=False, on_delete=models.PROTECT, verbose_name=_('租户'))
    extension = models.ForeignKey('Extension', blank=False, on_delete=models.PROTECT, verbose_name=_('插件'))
    config = models.JSONField(blank=True, default=dict, verbose_name=_('Runtime Config','运行时配置'))
    name = models.CharField(max_length=128, default='', verbose_name=_('名称'))
    type = models.CharField(max_length=128, default='', verbose_name=_('类型'))

config: JSONField blank django-field #

Runtime Config

created: DateTimeField blank django-field nullable #

创建时间

extension: ForeignKey django-field #

插件

id: UUIDField django-field #

ID

is_active: BooleanField django-field #

是否可用

is_del: BooleanField django-field #

是否删除

name: CharField django-field #

名称

tenant: ForeignKey django-field #

租户

type: CharField django-field #

类型

updated: DateTimeField blank django-field nullable #

更新时间

account_unbind(self, event, **kwargs) #

在账户解绑的时候会调用此方法,开发者可以根据需要重写此方法

Parameters:

Name Type Description Default
event

事件参数 data: 数据 user_id: 用户id

required
Source code in arkid/core/extension/external_idp.py
def account_unbind(self, event, **kwargs):
    '''
    在账户解绑的时候会调用此方法,开发者可以根据需要重写此方法
    Params:
        event: 事件参数
            data: 数据
                user_id: 用户id
    '''
    data = event.data
    pass

bind(self, request, config_id) #

处理第三方身份源返回的user_id和ArkID的user之间的绑定

Source code in arkid/core/extension/external_idp.py
@csrf_exempt
def bind(self, request, config_id):
    """
    处理第三方身份源返回的user_id和ArkID的user之间的绑定
    """
    from urllib.parse import unquote
    from arkid.core.event import SAVE_FILE, dispatch_event, Event
    from arkid.extension.models import Extension

    ext_id = request.POST.get("ext_id")
    user = verify_token(request)
    if not user:
        return JsonResponse({"error_msg": "Token验证失败", "code": ["token invalid"]})
    ext_icon = request.POST.get('ext_icon', '')
    ext_name = request.POST.get('ext_name', '')
    config = self.get_config_by_id(config_id)
    extension = Extension.active_objects.filter(
        type="storage"
    ).first()
    if ext_name:
        ext_name = unquote(ext_name)
        request.POST['ext_name'] = ext_name
    if ext_icon:
        data = {
            'fileurl': ext_icon
        }
        responses = dispatch_event(Event(tag=SAVE_FILE, tenant=config.tenant, request=request, packages=extension.package, data=data))
        useless, (fileinfo, extension) = responses[0]
        request.POST['ext_icon'] = fileinfo
    self.bind_arkid_user(ext_id, user, request.POST)
    # token = refresh_token(user)
    # data = {"token": token}
    data = {}
    return JsonResponse(self.success())

bind_arkid_user(self, ext_id, user, data) #

Parameters:

Name Type Description Default
ext_id str

第三方登录返回的用户标识

required
user arkid.core.models.User

ArkID的用户

required

Returns:

Type Description
{"token"

xxx}: 返回token

Source code in arkid/core/extension/external_idp.py
@abstractmethod
def bind_arkid_user(self, ext_id, user, data):
    """
    Args:
        ext_id (str): 第三方登录返回的用户标识
        user (arkid.core.models.User): ArkID的用户
        data (dict) request数据
    Returns:
        {"token":xxx}: 返回token
    """
    pass

callback(self, request, config_id) #

拿到请求中携带的code,调用get_ext_token_by_code获取第三方认证的token, 调用get_user_info_by_ext_token获取第三方认证提供的用户信息, 拿到ext_id后,判断该ext_id是否已经和ArkID中的用户绑定,如果绑定直接返回绑定用户的Token, 如果没有,返回重定向到前端绑定页面

Source code in arkid/core/extension/external_idp.py
def callback(self, request, config_id):
    """
    拿到请求中携带的code,调用get_ext_token_by_code获取第三方认证的token,
    调用get_user_info_by_ext_token获取第三方认证提供的用户信息,
    拿到ext_id后,判断该ext_id是否已经和ArkID中的用户绑定,如果绑定直接返回绑定用户的Token,
    如果没有,返回重定向到前端绑定页面
    """
    code = self.get_auth_code_from_request(request)
    next_url = request.GET.get("next", '')
    config = self.get_config_by_id(config_id)
    frontend_host = (
        get_app_config()
        .get_frontend_host()
        .replace('http://', '')
        .replace('https://', '')
    )
    # if next_url and (
    #     "third_part_callback" not in next_url or frontend_host not in next_url
    # ):
    #     return JsonResponse({'error_msg': '错误的跳转页面'})
    if next_url and (
        frontend_host not in next_url
    ):
        return JsonResponse({'error_msg': '错误的跳转页面'})
    if code:
        try:
            ext_token = self.get_ext_token(config, code)
            ext_id, ext_name, ext_icon, ext_info = self.get_ext_user_info(
                config, code, ext_token
            )
        except Exception as e:
            logger.error(e)
            args = e.args
            if len(args) == 1 and isinstance(args[0], dict):
                return JsonResponse(e.args[0], json_dumps_params={'ensure_ascii': False})
            else:
                return JsonResponse({"error_msg": str(e)}, json_dumps_params={'ensure_ascii': False})
    else:
        return JsonResponse({"error_msg": "授权码丢失", "code": ["required"]}, json_dumps_params={'ensure_ascii': False})

    context = self.get_arkid_token(ext_id, ext_name, ext_icon, config)
    query_string = urlencode(context)
    if next_url:
        url = f"{next_url}?{query_string}"
        url = unquote(url)
        return HttpResponseRedirect(url)
    else:
        frontend_host = get_app_config().get_frontend_host()
        frontend_callback = f'{frontend_host}/third_part_callback'
        url = f"{frontend_callback}?{query_string}"
        url = unquote(url)
        return HttpResponseRedirect(url)

create_tenant_config(self, tenant, config, name, type) #

创建运行时配置

Parameters:

Name Type Description Default
tenant Tenant

租户

required
config dict

config

required
name str

运行时配置名字

required
type str

配置类型

required

Returns:

Type Description
TenantExtensionConfig

创建的对象

Source code in arkid/core/extension/external_idp.py
def create_tenant_config(self, tenant, config, name, type):
    config_created = super().create_tenant_config(tenant, config, name, type)
    server_host = get_app_config().get_host()
    login_url = server_host + reverse(
        f'api:{self.pname}:{self.pname}_login',
        args=[config_created.id],
    )
    callback_url = server_host + reverse(
        f'api:{self.pname}:{self.pname}_callback',
        args=[config_created.id],
    )
    bind_url = server_host + reverse(
        f'api:{self.pname}:{self.pname}_bind',
        args=[config_created.id],
    )
    # img_url = self.get_img_url()
    config["login_url"] = login_url
    config["callback_url"] = callback_url
    config["bind_url"] = bind_url
    # config["img_url"] = img_url
    config_created.config = config
    config_created.save()
    return config_created

get_arkid_user(self, ext_id) #

抽象方法

Parameters:

Name Type Description Default
ext_id str

第三方认证返回的用户标识

required

Returns:

Type Description
arkid.core.models.User

ArkID用户

Source code in arkid/core/extension/external_idp.py
@abstractmethod
def get_arkid_user(self, ext_id):
    """
    抽象方法
    Args:
        ext_id (str): 第三方认证返回的用户标识
    Returns:
        arkid.core.models.User: ArkID用户
    """
    pass

get_auth_code_from_request(self, request) #

抽象方法

Parameters:

Name Type Description Default
request HTTPRequest

第三方认证返回的用户标识

required

Returns:

Type Description
str

授权码

Source code in arkid/core/extension/external_idp.py
@abstractmethod
def get_auth_code_from_request(self, request):
    """
    抽象方法
    Args:
        request (HTTPRequest): 第三方认证返回的用户标识
    Returns:
        str: 授权码
    """
    pass

get_authorize_url(self, config, callback_url, next_url) #

抽象方法

Parameters:

Name Type Description Default
config arkid.extension.models.TenantExtensionConfig

第三方认证提供的Client_ID,

required
callback_url str

由ArkID提供的回调地址

required
next_url str

前端传来的跳转地址

required

Returns:

Type Description
str

第三方登录提供的认证URL

Source code in arkid/core/extension/external_idp.py
@abstractmethod
def get_authorize_url(self, config, callback_url, next_url):
    """
    抽象方法
    Args:
        config (arkid.extension.models.TenantExtensionConfig): 第三方认证提供的Client_ID,
        callback_url (str): 由ArkID提供的回调地址
        next_url (str): 前端传来的跳转地址
    Returns:
        str: 第三方登录提供的认证URL
    """
    pass

get_ext_token(self, config, code) #

抽象方法

Parameters:

Name Type Description Default
code str

第三方认证返回的code

required
config arkid.core.extension.TenantExtensionConfig

第三方登录的插件运行时配置

required

Returns:

Type Description
str

返回第三方认证提供的token

Source code in arkid/core/extension/external_idp.py
@abstractmethod
def get_ext_token(self, config, code):
    """
    抽象方法
    Args:
        code (str): 第三方认证返回的code
        config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
    Returns:
        str: 返回第三方认证提供的token
    """
    pass

get_ext_user_info(self, config, code, token) #

抽象方法

Parameters:

Name Type Description Default
config arkid.core.extension.TenantExtensionConfig

第三方登录的插件运行时配置

required
code str

第三方认证返回的code

required
token str

第三方认证返回的token

required

Returns:

Type Description
dict

返回第三方认证提供的用户信息

Source code in arkid/core/extension/external_idp.py
@abstractmethod
def get_ext_user_info(self, config, code, token):
    """
    抽象方法
    Args:
        config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
        code (str): 第三方认证返回的code
        token (str): 第三方认证返回的token
    Returns:
        dict: 返回第三方认证提供的用户信息
    """
    pass

get_img_and_redirect_url(self, config) #

返回前端渲染第三方登录的按钮 抽象方法

Parameters:

Name Type Description Default
config arkid.extension.models.TenantExtensionConfig

第三方认证提供的Client_ID,

required

Returns:

Type Description
tuple

返回图片url和跳转地址('image_url', 'redirect_url')

Source code in arkid/core/extension/external_idp.py
@abstractmethod
def get_img_and_redirect_url(self, config):
    """
    返回前端渲染第三方登录的按钮
    抽象方法
    Args:
        config (arkid.extension.models.TenantExtensionConfig): 第三方认证提供的Client_ID,

    Returns:
        tuple: 返回图片url和跳转地址('image_url', 'redirect_url')
    """
    pass

get_img_url(self) #

抽象方法

Returns:

Type Description
url str

返回第三方登录按钮的图标

Source code in arkid/core/extension/external_idp.py
@abstractmethod
def get_img_url(self):
    """
    抽象方法

    Returns:
        url str: 返回第三方登录按钮的图标
    """
    pass

load(self) #

抽象方法,插件加载的入口方法

Source code in arkid/core/extension/external_idp.py
def load(self):

    urls = [
        re_path(
            rf'^idp/{self.pname}/(?P<config_id>[\w-]+)/login$',
            self.login,
            name=f'{self.pname}_login',
        ),
        re_path(
            rf'^idp/{self.pname}/(?P<config_id>[\w-]+)/callback$',
            self.callback,
            name=f'{self.pname}_callback',
        ),
        re_path(
            rf'^idp/{self.pname}/(?P<config_id>[\w-]+)/bind$',
            self.bind,
            name=f'{self.pname}_bind',
        ),
    ]
    self.register_routers(urls, False)
    self.listen_event(
        core_event.CREATE_LOGIN_PAGE_AUTH_FACTOR, self.add_idp_login_buttons
    )
    self.listen_event(
        core_event.ACCOUNT_UNBIND, self.account_unbind
    )
    super().load()

login(self, request, config_id) #

重定向到第三方登录的入口地址, 该入口地址由get_authorize_url提供

Source code in arkid/core/extension/external_idp.py
def login(self, request, config_id):
    """
    重定向到第三方登录的入口地址, 该入口地址由get_authorize_url提供
    """
    config = self.get_config_by_id(config_id)
    if not config:
        return JsonResponse({"error_msg": "没有找到登录配置"})
    callback_url = config.config.get("callback_url")
    # callback_url = callback_url.replace(
    #     "localhost:8000", "xxxx.vaiwan.com"
    # )  # 内网穿透测试用
    next_url = request.GET.get("next", None)
    if next_url is not None:
        next_url = "?next=" + next_url
    else:
        next_url = ""
    url = self.get_authorize_url(config, callback_url, next_url)

    return HttpResponseRedirect(url)

update_tenant_config(self, id, config, name, type) #

更新运行时配置

Parameters:

Name Type Description Default
id str

config_id

required
config dict

config

required
name str

运行时配置名字

required
type str

配置类型

required

Returns:

Type Description
bool

更新成功True,没有找到该配置返回False

Source code in arkid/core/extension/external_idp.py
def update_tenant_config(self, id, config, name, type):
    super().update_tenant_config(id, config, name, type)
    config_created = TenantExtensionConfig.valid_objects.filter(id=id).first()
    server_host = get_app_config().get_host()
    login_url = server_host + reverse(
        f'api:{self.pname}:{self.pname}_login',
        args=[config_created.id],
    )
    callback_url = server_host + reverse(
        f'api:{self.pname}:{self.pname}_callback',
        args=[config_created.id],
    )
    bind_url = server_host + reverse(
        f'api:{self.pname}:{self.pname}_bind',
        args=[config_created.id],
    )
    # img_url = self.get_img_url()
    config["login_url"] = login_url
    config["callback_url"] = callback_url
    config["bind_url"] = bind_url
    # config["img_url"] = img_url
    config_created.config = config
    config_created.save()
    return config_created

示例#

extension_root.com_longgui_external_idp_github.ExternalIdpGithubExtension (ExternalIdpExtension) #

Source code in extension_root/com_longgui_external_idp_github/__init__.py
class ExternalIdpGithubExtension(ExternalIdpExtension):
    def load(self):
        super().load()
        self.register_extend_field(GithubUser, "github_user_id")
        self.register_extend_field(GithubUser, "github_nickname")
        self.register_extend_field(GithubUser, "github_avatar")
        self.register_external_idp_schema("github", GithubConfigSchema)

    def get_img_url(self):
        """
        返回Github的图标URL
        """
        return IMG_URL

    def get_auth_code_from_request(self, request):
        code = request.GET.get("code", '')
        return code

    def get_authorize_url(self, config, callback_url, next_url):
        """
        Args:
            config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
            redirect_uri (str): 在ArkID中创建Github登录配置后返回的回调地址
        Returns:
            str: 返回用于向Github发起认证的URL
        """
        redirect_uri = "{}{}".format(callback_url, next_url)
        redirect_uri = quote(redirect_uri)
        url = "{}?client_id={}&redirect_uri={}".format(
            AUTHORIZE_URL,
            config.config.get("client_id"),
            redirect_uri,
        )
        return url

    def get_ext_token(self, config, code):
        """
        Args:
            config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
            code (str): Github返回的授权码
        Returns:
            str: 返回Github返回的access_token
        """
        response = requests.post(
            GET_TOKEN_URL,
            params={
                "code": code,
                "client_id": config.config.get("client_id"),
                "client_secret": config.config.get("client_secret"),
                "grant_type": "authorization_code",
            },
        ).__getattribute__("_content")
        result = dict(
            [(k, v[0]) for k, v in parse_qs(response.decode()).items()]
        )  # 将响应信息转换为字典
        return result["access_token"]

    def get_ext_user_info(self, config, code, token):
        """
        Args:
            config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
            code (str): Github返回的授权码
            token (str): Github返回的access_token
        Returns:
            tuple: 返回Github中用户信息中的id, login,avatar_url和所有用户信息
        """
        headers = {"Authorization": "token " + token}
        response = requests.get(
            GET_USERINFO_URL,
            params={"access_token": token},
            headers=headers,
        ).json()
        return response["id"], response["login"], response["avatar_url"], response

    def get_arkid_user(self, ext_id):
        """
        Args:
            ext_id (str): 从Github用户信息接口获取的用户标识
        Returns:
            arkid.core.models.User: 返回ext_id绑定的ArkID用户
        """
        github_user = GithubUser.valid_objects.filter(github_user_id=ext_id).first()
        if github_user:
            return github_user.target
        else:
            return None

    def bind_arkid_user(self, ext_id, user, data):
        """
        Args:
            ext_id (str): 从Github用户信息接口获取的用户标识
            user (arkid.core.models.User): 用于绑定的ArkID用户
            data (dict) request数据
        """
        user.github_user_id = ext_id
        user.github_nickname = data.get('ext_name', '')
        user.github_avatar = data.get('ext_icon', '')
        user.save()

    def get_img_and_redirect_url(self, config):
        return config.config.get("img_url", ""), config.config.get("login_url", "")

bind_arkid_user(self, ext_id, user, data) #

Parameters:

Name Type Description Default
ext_id str

从Github用户信息接口获取的用户标识

required
user arkid.core.models.User

用于绑定的ArkID用户

required
Source code in extension_root/com_longgui_external_idp_github/__init__.py
def bind_arkid_user(self, ext_id, user, data):
    """
    Args:
        ext_id (str): 从Github用户信息接口获取的用户标识
        user (arkid.core.models.User): 用于绑定的ArkID用户
        data (dict) request数据
    """
    user.github_user_id = ext_id
    user.github_nickname = data.get('ext_name', '')
    user.github_avatar = data.get('ext_icon', '')
    user.save()

get_arkid_user(self, ext_id) #

Parameters:

Name Type Description Default
ext_id str

从Github用户信息接口获取的用户标识

required

Returns:

Type Description
arkid.core.models.User

返回ext_id绑定的ArkID用户

Source code in extension_root/com_longgui_external_idp_github/__init__.py
def get_arkid_user(self, ext_id):
    """
    Args:
        ext_id (str): 从Github用户信息接口获取的用户标识
    Returns:
        arkid.core.models.User: 返回ext_id绑定的ArkID用户
    """
    github_user = GithubUser.valid_objects.filter(github_user_id=ext_id).first()
    if github_user:
        return github_user.target
    else:
        return None

get_auth_code_from_request(self, request) #

抽象方法

Parameters:

Name Type Description Default
request HTTPRequest

第三方认证返回的用户标识

required

Returns:

Type Description
str

授权码

Source code in extension_root/com_longgui_external_idp_github/__init__.py
def get_auth_code_from_request(self, request):
    code = request.GET.get("code", '')
    return code

get_authorize_url(self, config, callback_url, next_url) #

Parameters:

Name Type Description Default
config arkid.core.extension.TenantExtensionConfig

第三方登录的插件运行时配置

required
redirect_uri str

在ArkID中创建Github登录配置后返回的回调地址

required

Returns:

Type Description
str

返回用于向Github发起认证的URL

Source code in extension_root/com_longgui_external_idp_github/__init__.py
def get_authorize_url(self, config, callback_url, next_url):
    """
    Args:
        config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
        redirect_uri (str): 在ArkID中创建Github登录配置后返回的回调地址
    Returns:
        str: 返回用于向Github发起认证的URL
    """
    redirect_uri = "{}{}".format(callback_url, next_url)
    redirect_uri = quote(redirect_uri)
    url = "{}?client_id={}&redirect_uri={}".format(
        AUTHORIZE_URL,
        config.config.get("client_id"),
        redirect_uri,
    )
    return url

get_ext_token(self, config, code) #

Parameters:

Name Type Description Default
config arkid.core.extension.TenantExtensionConfig

第三方登录的插件运行时配置

required
code str

Github返回的授权码

required

Returns:

Type Description
str

返回Github返回的access_token

Source code in extension_root/com_longgui_external_idp_github/__init__.py
def get_ext_token(self, config, code):
    """
    Args:
        config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
        code (str): Github返回的授权码
    Returns:
        str: 返回Github返回的access_token
    """
    response = requests.post(
        GET_TOKEN_URL,
        params={
            "code": code,
            "client_id": config.config.get("client_id"),
            "client_secret": config.config.get("client_secret"),
            "grant_type": "authorization_code",
        },
    ).__getattribute__("_content")
    result = dict(
        [(k, v[0]) for k, v in parse_qs(response.decode()).items()]
    )  # 将响应信息转换为字典
    return result["access_token"]

get_ext_user_info(self, config, code, token) #

Parameters:

Name Type Description Default
config arkid.core.extension.TenantExtensionConfig

第三方登录的插件运行时配置

required
code str

Github返回的授权码

required
token str

Github返回的access_token

required

Returns:

Type Description
tuple

返回Github中用户信息中的id, login,avatar_url和所有用户信息

Source code in extension_root/com_longgui_external_idp_github/__init__.py
def get_ext_user_info(self, config, code, token):
    """
    Args:
        config (arkid.core.extension.TenantExtensionConfig): 第三方登录的插件运行时配置
        code (str): Github返回的授权码
        token (str): Github返回的access_token
    Returns:
        tuple: 返回Github中用户信息中的id, login,avatar_url和所有用户信息
    """
    headers = {"Authorization": "token " + token}
    response = requests.get(
        GET_USERINFO_URL,
        params={"access_token": token},
        headers=headers,
    ).json()
    return response["id"], response["login"], response["avatar_url"], response

get_img_and_redirect_url(self, config) #

返回前端渲染第三方登录的按钮 抽象方法

Parameters:

Name Type Description Default
config arkid.extension.models.TenantExtensionConfig

第三方认证提供的Client_ID,

required

Returns:

Type Description
tuple

返回图片url和跳转地址('image_url', 'redirect_url')

Source code in extension_root/com_longgui_external_idp_github/__init__.py
def get_img_and_redirect_url(self, config):
    return config.config.get("img_url", ""), config.config.get("login_url", "")

get_img_url(self) #

返回Github的图标URL

Source code in extension_root/com_longgui_external_idp_github/__init__.py
def get_img_url(self):
    """
    返回Github的图标URL
    """
    return IMG_URL

load(self) #

抽象方法,插件加载的入口方法

Source code in extension_root/com_longgui_external_idp_github/__init__.py
def load(self):
    super().load()
    self.register_extend_field(GithubUser, "github_user_id")
    self.register_extend_field(GithubUser, "github_nickname")
    self.register_extend_field(GithubUser, "github_avatar")
    self.register_external_idp_schema("github", GithubConfigSchema)

评论