feat: authentication
Some checks are pending
sync2gitee / repo-sync (push) Waiting to run

This commit is contained in:
shaohuzhang1 2025-04-15 20:37:38 +08:00
parent 5ffd3423ff
commit 04642eb497
11 changed files with 348 additions and 35 deletions

View File

@ -12,11 +12,11 @@ from importlib import import_module
from django.conf import settings
from django.core import cache
from django.core import signing
from django.utils.translation import gettext_lazy as _
from rest_framework.authentication import TokenAuthentication
from common.exception.app_exception import AppAuthenticationFailed, AppEmbedIdentityFailed, AppChatNumOutOfBoundsFailed, \
ChatException, AppApiException
from django.utils.translation import gettext_lazy as _
AppApiException
token_cache = cache.caches['default']

View File

@ -0,0 +1,100 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file authentication.py
@date2025/4/15 20:12
@desc:
"""
from typing import List
from django.utils.translation import gettext_lazy as _
from common.constants.permission_constants import PermissionConstants, RoleConstants, ViewPermission, CompareConstants, \
Permission
from common.exception.app_exception import AppUnauthorizedFailed
def exist_permissions_by_permission_constants(user_permission: List[PermissionConstants],
permission_list: List[PermissionConstants]):
"""
用户是否拥有 permission_list的权限
:param user_permission: 用户权限
:param permission_list: 需要的权限
:return: 是否拥有
"""
return any(list(map(lambda up: permission_list.__contains__(up), user_permission)))
def exist_role_by_role_constants(user_role: List[RoleConstants],
role_list: List[RoleConstants]):
"""
用户是否拥有这个角色
:param user_role: 用户角色
:param role_list: 需要拥有的角色
:return: 是否拥有
"""
return any(list(map(lambda up: role_list.__contains__(up), user_role)))
def exist_permissions_by_view_permission(user_role: List[RoleConstants],
user_permission: List[PermissionConstants | object],
permission: ViewPermission, request, **kwargs):
"""
用户是否存在这些权限
:param request:
:param user_role: 用户角色
:param user_permission: 用户权限
:param permission: 所属权限
:return: 是否存在 True False
"""
role_ok = any(list(map(lambda ur: permission.roleList.__contains__(ur), user_role)))
permission_list = [user_p(request, kwargs) if callable(user_p) else user_p for user_p in
permission.permissionList
]
permission_ok = any(list(map(lambda up: permission_list.__contains__(up),
user_permission)))
return role_ok | permission_ok if permission.compare == CompareConstants.OR else role_ok & permission_ok
def exist_permissions(user_role: List[RoleConstants], user_permission: List[PermissionConstants], permission, request,
**kwargs):
if isinstance(permission, ViewPermission):
return exist_permissions_by_view_permission(user_role, user_permission, permission, request, **kwargs)
if isinstance(permission, RoleConstants):
return exist_role_by_role_constants(user_role, [permission])
if isinstance(permission, PermissionConstants):
return exist_permissions_by_permission_constants(user_permission, [permission])
if isinstance(permission, Permission):
return user_permission.__contains__(permission)
return False
def exist(user_role: List[RoleConstants], user_permission: List[PermissionConstants], permission, request, **kwargs):
if callable(permission):
p = permission(request, kwargs)
return exist_permissions(user_role, user_permission, p, request)
return exist_permissions(user_role, user_permission, permission, request, **kwargs)
def has_permissions(*permission, compare=CompareConstants.OR):
"""
权限 role or permission
:param compare: 比较符号
:param permission: 如果是角色 role:roleId
:return: 权限装饰器函数,用于判断用户是否有权限访问当前接口
"""
def inner(func):
def run(view, request, **kwargs):
exit_list = list(
map(lambda p: exist(request.auth.current_role_list, request.auth.permission_list, p, request, **kwargs),
permission))
# 判断是否有权限
if any(exit_list) if compare == CompareConstants.OR else all(exit_list):
return func(view, request, **kwargs)
raise AppUnauthorizedFailed(403, _('No permission to access'))
return run
return inner

View File

@ -1,20 +1,107 @@
# coding=utf-8
"""
@project: maxkb
@project: MaxKB
@Author虎虎
@file authenticate.py
@date2024/3/14 03:02
@desc: 用户认证
"""
from django.core.cache import cache
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from common.auth.handle.auth_base_handle import AuthBaseHandle
from common.constants.authentication_type import AuthenticationType
from common.constants.cache_version import Cache_Version
from common.constants.permission_constants import Auth, RoleConstants
from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.exception.app_exception import AppAuthenticationFailed
from users.models import User
from django.core.cache import cache
from django.utils.translation import gettext_lazy as _
def get_permission_list(user_id,
workspace_id,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
version, get_key = Cache_Version.PERMISSION_LIST.value
key = get_key(user_id, workspace_id)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
permission_list = cache.get(key, version=version)
if permission_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
# 获取角色权限映射数据
role_permission_mapping_list = QuerySet(role_permission_mapping_model).filter(
role_id__in=[workspace_user_role_mapping.role_id for workspace_user_role_mapping in
workspace_user_role_mapping_list])
permission_list = [role_model.id for role_model in role_permission_mapping_list]
cache.set(key, permission_list, version=version)
else:
permission_list = get_default_permission_list_by_role(RoleConstants.ADMIN)
cache.set(key, permission_list, version=version)
return permission_list
def get_workspace_list(user_id,
workspace_id,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
version, get_key = Cache_Version.WORKSPACE_LIST.value
key = get_key(user_id)
workspace_list = cache.get(key, version=version)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
if workspace_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
cache.set(key, [workspace_user_role_mapping.workspace_id for workspace_user_role_mapping in
workspace_user_role_mapping_list], version=version)
else:
return ["default"]
return workspace_list
def get_role_list(user,
workspace_id,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
version, get_key = Cache_Version.ROLE_LIST.value
key = get_key(user.id, workspace_id)
workspace_list = cache.get(key, version=version)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
if workspace_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user.id)
cache.set(key, [workspace_user_role_mapping.role_id for workspace_user_role_mapping in
workspace_user_role_mapping_list], version=version)
else:
cache.set(key, [user.role], version=version)
return [user.role]
return workspace_list
def get_auth(user, workspace_id):
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
workspace_model = DatabaseModelManage.get_model("workspace_model")
role_model = DatabaseModelManage.get_model("role_model")
role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
workspace_list = get_workspace_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model,
role_model, role_permission_mapping_model)
permission_list = get_permission_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model,
role_model, role_permission_mapping_model)
role_list = get_role_list(user, workspace_id, workspace_user_role_mapping_model, workspace_model,
role_model, role_permission_mapping_model)
return Auth(workspace_list, workspace_id, role_list, permission_list)
class UserToken(AuthBaseHandle):
@ -25,12 +112,13 @@ class UserToken(AuthBaseHandle):
return True
def handle(self, request, token: str, get_token_details):
cache_token = cache.get(token, version=Cache_Version.TOKEN)
version, get_key = Cache_Version.TOKEN.value
cache_token = cache.get(get_key(token), version=version)
if cache_token is None:
raise AppAuthenticationFailed(1002, _('Login expired'))
auth_details = get_token_details()
# 当前工作空间
current_workspace = auth_details['current_workspace']
user = QuerySet(User).get(id=auth_details['id'])
role = RoleConstants[user.role]
return user, Auth([], [],
client_id=str(user.id),
client_type=AuthenticationType.SYSTEM_USER.value, current_role=role)
auth = get_auth(user, current_workspace)
return user, auth

View File

@ -10,5 +10,16 @@ from enum import Enum
class Cache_Version(Enum):
# 系统用户
TOKEN = "TOKEN"
# 令牌
TOKEN = "TOKEN", lambda token: token
# 工作空间列表
WORKSPACE_LIST = "WORKSPACE::LIST", lambda user_id: user_id
# 用户数据
USER = "USER", lambda user_id: user_id
# 当前用户在当前工作空间的角色列表+本身的角色
ROLE_LIST = "ROLE::LIST", lambda user_id, workspace_id: f"{user_id}::{workspace_id}"
# 当前用户在当前工作空间的权限列表+本身的权限列表
PERMISSION_LIST = "PERMISSION::LIST", lambda user_id, workspace_id: f"{user_id}::{workspace_id}"
version, get_key = Cache_Version.TOKEN.value

View File

@ -8,8 +8,6 @@
from enum import Enum
from typing import List
from django.utils.translation import gettext_lazy as _
class Group(Enum):
"""
@ -26,6 +24,10 @@ class Operate(Enum):
EDIT = "EDIT"
CREATE = "CREATE"
DELETE = "DELETE"
"""
使用权限
"""
USE = "USE"
class RoleGroup(Enum):
@ -43,7 +45,9 @@ class Role:
class RoleConstants(Enum):
ADMIN = Role(_("ADMIN"), _('Super administrator'), RoleGroup.SYSTEM_USER)
ADMIN = Role("ADMIN", '超级管理员', RoleGroup.SYSTEM_USER)
WORKSPACE_MANAGE = Role("WORKSPACE_MANAGE", '工作空间管理员', RoleGroup.SYSTEM_USER)
USER = Role("USER", '普通用户', RoleGroup.SYSTEM_USER)
class Permission:
@ -51,13 +55,24 @@ class Permission:
权限信息
"""
def __init__(self, group: Group, operate: Operate, roles=None, dynamic_tag=None):
if roles is None:
roles = []
def __init__(self, group: Group, operate: Operate, dynamic_tag=None, role_list=None):
if role_list is None:
role_list = []
self.group = group
self.operate = operate
self.roleList = roles
self.dynamic_tag = dynamic_tag
# 用于获取角色与权限的关系,只适用于没有权限管理的
self.role_list = role_list
@staticmethod
def new_instance(permission_str: str):
permission_split = permission_str.split(":")
group = Group[permission_split[0]]
operate = Operate[permission_split[2]]
if len(permission_split) > 2:
dynamic_tag = ":".join(permission_split[2:])
return Permission(group, operate, dynamic_tag)
return Permission(group, operate)
def __str__(self):
return self.group.value + ":" + self.operate.value + (
@ -71,19 +86,20 @@ class PermissionConstants(Enum):
"""
权限枚举
"""
USER_READ = Permission(group=Group.USER, operate=Operate.READ, roles=[RoleConstants.ADMIN])
USER_EDIT = Permission(group=Group.USER, operate=Operate.EDIT, roles=[RoleConstants.ADMIN])
USER_DELETE = Permission(group=Group.USER, operate=Operate.DELETE, roles=[RoleConstants.ADMIN])
USER_READ = Permission(group=Group.USER, operate=Operate.READ, role_list=[RoleConstants.ADMIN,
RoleConstants.USER])
USER_EDIT = Permission(group=Group.USER, operate=Operate.EDIT, role_list=[RoleConstants.ADMIN])
USER_DELETE = Permission(group=Group.USER, operate=Operate.DELETE, role_list=[RoleConstants.ADMIN])
def get_permission_list_by_role(role: RoleConstants):
def get_default_permission_list_by_role(role: RoleConstants):
"""
根据角色 获取角色对应的权限
:param role: 角色
:return: 权限
"""
return list(map(lambda k: PermissionConstants[k],
list(filter(lambda k: PermissionConstants[k].value.roleList.__contains__(role),
list(filter(lambda k: PermissionConstants[k].value.role_list.__contains__(role),
PermissionConstants.__members__))))
@ -92,14 +108,21 @@ class Auth:
用于存储当前用户的角色和权限
"""
def __init__(self, role_list: List[RoleConstants], permission_list: List[PermissionConstants | Permission]
, client_id, client_type, current_role: RoleConstants, **keywords):
self.role_list = role_list
def __init__(self,
work_space_list: List,
current_workspace,
current_role_list: List[Role],
permission_list: List[PermissionConstants | Permission],
**keywords):
# 当前用户所有工作空间
self.work_space_list = work_space_list
# 当前工作空间
self.current_workspace = current_workspace
# 当前工作空间的所有权限+非工作空间权限
self.permission_list = permission_list
self.client_id = client_id
self.client_type = client_type
# 当前工作空间角色列表
self.current_role_list = current_role_list
self.keywords = keywords
self.current_role = current_role
class CompareConstants(Enum):

View File

@ -0,0 +1,44 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file database_model_manage.py
@date2025/4/15 11:06
@desc:
"""
from importlib import import_module
from django.conf import settings
def new_instance_by_class_path(class_path: str):
"""
根据class_path 创建实例
"""
parts = class_path.rpartition('.')
package_path = parts[0]
class_name = parts[2]
module = import_module(package_path)
HandlerClass = getattr(module, class_name)
return HandlerClass()
class DatabaseModelManage:
"""
模型字典
"""
model_dict = {}
@staticmethod
def get_model(model_name):
"""
根据模型
"""
return DatabaseModelManage.model_dict.get(model_name)
@staticmethod
def init():
handles = [new_instance_by_class_path(class_path) for class_path in
(settings.MODEL_HANDLES if hasattr(settings, 'MODEL_HANDLES') else [])]
for h in handles:
model_dict = h.get_model_dict()
DatabaseModelManage.model_dict = {**DatabaseModelManage.model_dict, **model_dict}

View File

@ -0,0 +1,15 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file base_handle.py
@date2025/4/15 11:16
@desc:
"""
from abc import ABC, abstractmethod
class IBaseModelHandle(ABC):
@abstractmethod
def get_model_dict(self):
pass

View File

@ -0,0 +1,14 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file default_base_model_handle.py
@date2025/4/15 11:20
@desc:
"""
from common.database_model_manage.handle.base_handle import IBaseModelHandle
class DefaultBaseModelHandle(IBaseModelHandle):
def get_model_dict(self):
return {}

View File

@ -46,6 +46,8 @@ class LoginSerializer(serializers.Serializer):
token = signing.dumps({'username': user.username,
'id': str(user.id),
'email': user.email,
'type': AuthenticationType.SYSTEM_USER.value})
cache.set(token, user, version=Cache_Version.TOKEN)
'type': AuthenticationType.SYSTEM_USER.value,
'current_workspace': 'default'})
version, get_key = Cache_Version.TOKEN.value
cache.set(get_key(token), user, version=version)
return {'token': token}

View File

@ -5,5 +5,6 @@ from . import views
app_name = "user"
urlpatterns = [
path('user/login', views.LoginView.as_view(), name='login'),
path('user/profile', views.UserProfileView.as_view(), name="user_profile")
path('user/profile', views.UserProfileView.as_view(), name="user_profile"),
path('user/test', views.TestPermissionsUserView.as_view(), name="test")
]

View File

@ -12,6 +12,8 @@ from django.utils.translation import gettext_lazy as _
from rest_framework.request import Request
from common.auth import TokenAuth
from common.auth.authentication import has_permissions
from common.constants.permission_constants import PermissionConstants
from common.result import result
from users.api.user import UserProfileAPI
from users.serializers.user import UserProfileSerializer
@ -27,3 +29,16 @@ class UserProfileView(APIView):
responses=UserProfileAPI.get_response())
def get(self, request: Request):
return result.success(UserProfileSerializer().profile(request.user))
class TestPermissionsUserView(APIView):
authentication_classes = [TokenAuth]
@extend_schema(methods=['GET'],
description=_("Get current user information"),
operation_id=_("Get current user information"),
tags=[_("User management")],
responses=UserProfileAPI.get_response())
@has_permissions(PermissionConstants.USER_EDIT)
def get(self, request: Request):
return result.success(UserProfileSerializer().profile(request.user))