MaxKB/apps/common/auth/handle/impl/public_access_token.py

68 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: qabot
@Author
@file authenticate.py
@date2024/3/14 03:02
@desc: 公共访问连接认证
"""
from django.db.models import QuerySet
from application.models.api_key_model import ApplicationAccessToken
from common.auth.handle.auth_base_handle import AuthBaseHandle
from common.constants.authentication_type import AuthenticationType
from common.constants.permission_constants import RoleConstants, Permission, Group, Operate, Auth
from common.exception.app_exception import AppAuthenticationFailed, ChatException
from common.models.db_model_manage import DBModelManage
from common.util.common import password_encrypt
class PublicAccessToken(AuthBaseHandle):
def support(self, request, token: str, get_token_details):
token_details = get_token_details()
if token_details is None:
return False
return (
'application_id' in token_details and
'access_token' in token_details and
token_details.get('type') == AuthenticationType.APPLICATION_ACCESS_TOKEN.value)
def handle(self, request, token: str, get_token_details):
auth_details = get_token_details()
application_access_token = QuerySet(ApplicationAccessToken).filter(
application_id=auth_details.get('application_id')).first()
if request.path != '/api/application/profile':
application_setting_model = DBModelManage.get_model('application_setting')
xpack_cache = DBModelManage.get_model('xpack_cache')
X_PACK_LICENSE_IS_VALID = False if xpack_cache is None else xpack_cache.get('XPACK_LICENSE_IS_VALID', False)
if application_setting_model is not None and X_PACK_LICENSE_IS_VALID:
application_setting = QuerySet(application_setting_model).filter(application_id=str(
application_access_token.application_id)).first()
if application_setting.authentication:
authentication = auth_details.get('authentication', {})
if authentication is None:
authentication = {}
if application_setting.authentication_value.get('type') != authentication.get(
'type') or password_encrypt(
application_setting.authentication_value.get('value')) != authentication.get('value'):
raise ChatException(1002, "身份验证信息不正确")
if application_access_token is None:
raise AppAuthenticationFailed(1002, "身份验证信息不正确")
if not application_access_token.is_active:
raise AppAuthenticationFailed(1002, "身份验证信息不正确")
if not application_access_token.access_token == auth_details.get('access_token'):
raise AppAuthenticationFailed(1002, "身份验证信息不正确")
return application_access_token.application.user, Auth(
role_list=[RoleConstants.APPLICATION_ACCESS_TOKEN],
permission_list=[
Permission(group=Group.APPLICATION,
operate=Operate.USE,
dynamic_tag=str(
application_access_token.application_id))],
application_id=application_access_token.application_id,
client_id=auth_details.get('client_id'),
client_type=AuthenticationType.APPLICATION_ACCESS_TOKEN.value,
current_role=RoleConstants.APPLICATION_ACCESS_TOKEN
)