MaxKB/apps/common/auth/handle/impl/user_token.py
shaohuzhang1 04642eb497
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run
feat: authentication
2025-04-15 20:37:38 +08:00

125 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file authenticate.py
@date2024/3/14 03:02
@desc: 用户认证
"""
from django.core.cache import cache
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from common.auth.handle.auth_base_handle import AuthBaseHandle
from common.constants.cache_version import Cache_Version
from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.exception.app_exception import AppAuthenticationFailed
from users.models import User
def get_permission_list(user_id,
workspace_id,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
version, get_key = Cache_Version.PERMISSION_LIST.value
key = get_key(user_id, workspace_id)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
permission_list = cache.get(key, version=version)
if permission_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
# 获取角色权限映射数据
role_permission_mapping_list = QuerySet(role_permission_mapping_model).filter(
role_id__in=[workspace_user_role_mapping.role_id for workspace_user_role_mapping in
workspace_user_role_mapping_list])
permission_list = [role_model.id for role_model in role_permission_mapping_list]
cache.set(key, permission_list, version=version)
else:
permission_list = get_default_permission_list_by_role(RoleConstants.ADMIN)
cache.set(key, permission_list, version=version)
return permission_list
def get_workspace_list(user_id,
workspace_id,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
version, get_key = Cache_Version.WORKSPACE_LIST.value
key = get_key(user_id)
workspace_list = cache.get(key, version=version)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
if workspace_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
cache.set(key, [workspace_user_role_mapping.workspace_id for workspace_user_role_mapping in
workspace_user_role_mapping_list], version=version)
else:
return ["default"]
return workspace_list
def get_role_list(user,
workspace_id,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
version, get_key = Cache_Version.ROLE_LIST.value
key = get_key(user.id, workspace_id)
workspace_list = cache.get(key, version=version)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
if workspace_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user.id)
cache.set(key, [workspace_user_role_mapping.role_id for workspace_user_role_mapping in
workspace_user_role_mapping_list], version=version)
else:
cache.set(key, [user.role], version=version)
return [user.role]
return workspace_list
def get_auth(user, workspace_id):
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
workspace_model = DatabaseModelManage.get_model("workspace_model")
role_model = DatabaseModelManage.get_model("role_model")
role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
workspace_list = get_workspace_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model,
role_model, role_permission_mapping_model)
permission_list = get_permission_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model,
role_model, role_permission_mapping_model)
role_list = get_role_list(user, workspace_id, workspace_user_role_mapping_model, workspace_model,
role_model, role_permission_mapping_model)
return Auth(workspace_list, workspace_id, role_list, permission_list)
class UserToken(AuthBaseHandle):
def support(self, request, token: str, get_token_details):
auth_details = get_token_details()
if auth_details is None:
return False
return True
def handle(self, request, token: str, get_token_details):
version, get_key = Cache_Version.TOKEN.value
cache_token = cache.get(get_key(token), version=version)
if cache_token is None:
raise AppAuthenticationFailed(1002, _('Login expired'))
auth_details = get_token_details()
# 当前工作空间
current_workspace = auth_details['current_workspace']
user = QuerySet(User).get(id=auth_details['id'])
auth = get_auth(user, current_workspace)
return user, auth