mirror of
https://github.com/1Panel-dev/MaxKB.git
synced 2025-12-26 01:33:05 +00:00
feat: Improve workspace resource authorization (#3004)
This commit is contained in:
parent
7aa294db38
commit
7bda26d92a
|
|
@ -88,7 +88,7 @@ class TokenAuth(TokenAuthentication):
|
|||
return handle.handle(request, token, token_details.get_token_details)
|
||||
raise AppAuthenticationFailed(1002, _('Authentication information is incorrect! illegal user'))
|
||||
except Exception as e:
|
||||
traceback.format_exc()
|
||||
traceback.print_stack()
|
||||
if isinstance(e, AppEmbedIdentityFailed) or isinstance(e, AppChatNumOutOfBoundsFailed) or isinstance(e,
|
||||
AppApiException):
|
||||
raise e
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
"""
|
||||
import datetime
|
||||
from functools import reduce
|
||||
from typing import List
|
||||
|
||||
from django.core.cache import cache
|
||||
from django.db.models import QuerySet
|
||||
|
|
@ -15,37 +16,108 @@ from django.utils.translation import gettext_lazy as _
|
|||
|
||||
from common.auth.handle.auth_base_handle import AuthBaseHandle
|
||||
from common.constants.cache_version import Cache_Version
|
||||
from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role, \
|
||||
PermissionConstants
|
||||
from common.constants.permission_constants import Auth, PermissionConstants, ResourcePermissionGroup, \
|
||||
get_permission_list_by_resource_group, ResourceAuthType, \
|
||||
ResourcePermissionRole, get_default_role_permission_mapping_list, get_default_workspace_user_role_mapping_list
|
||||
from common.database_model_manage.database_model_manage import DatabaseModelManage
|
||||
from common.exception.app_exception import AppAuthenticationFailed
|
||||
from common.utils.common import group_by
|
||||
from system_manage.models.workspace_user_permission import WorkspaceUserPermission
|
||||
from system_manage.models.workspace_user_permission import WorkspaceUserResourcePermission
|
||||
from users.models import User
|
||||
|
||||
|
||||
def get_permission(permission_id):
|
||||
"""
|
||||
获取权限字符串
|
||||
@param permission_id: 权限id
|
||||
@return: 权限字符串
|
||||
"""
|
||||
if isinstance(permission_id, PermissionConstants):
|
||||
permission_id = permission_id.value
|
||||
return f"{permission_id}"
|
||||
|
||||
|
||||
def get_workspace_permission(permission_id, workspace_id):
|
||||
"""
|
||||
获取工作空间权限字符串
|
||||
@param permission_id: 权限id
|
||||
@param workspace_id: 工作空间id
|
||||
@return:
|
||||
"""
|
||||
if isinstance(permission_id, PermissionConstants):
|
||||
permission_id = permission_id.value
|
||||
return f"{permission_id}:/WORKSPACE/{workspace_id}"
|
||||
|
||||
|
||||
def get_workspace_resource_permission_list(permission_id, workspace_id, workspace_user_permission_dict):
|
||||
workspace_user_permission_list = workspace_user_permission_dict.get(workspace_id)
|
||||
if workspace_user_permission_list is None:
|
||||
def get_workspace_permission_list(role_permission_mapping_dict, workspace_user_role_mapping_list):
|
||||
"""
|
||||
获取工作空间下所有的权限
|
||||
@param role_permission_mapping_dict: 角色权限关联字典
|
||||
@param workspace_user_role_mapping_list: 工作空间用户角色关联列表
|
||||
@return: 工作空间下的权限
|
||||
"""
|
||||
workspace_permission_list = [
|
||||
[get_workspace_permission(role_permission_mapping.permission_id, w_u_r.workspace_id) for role_permission_mapping
|
||||
in
|
||||
role_permission_mapping_dict.get(w_u_r.role_id, [])] for w_u_r in workspace_user_role_mapping_list]
|
||||
return reduce(lambda x, y: [*x, *y], workspace_permission_list, [])
|
||||
|
||||
|
||||
def get_workspace_resource_permission_list(
|
||||
workspace_user_resource_permission_list: List[WorkspaceUserResourcePermission],
|
||||
role_permission_mapping_dict,
|
||||
workspace_user_role_mapping_dict):
|
||||
"""
|
||||
|
||||
@param workspace_user_resource_permission_list: 工作空间用户资源权限列表
|
||||
@param role_permission_mapping_dict: 角色权限关联字典 key为role_id
|
||||
@param workspace_user_role_mapping_dict: 工作空间用户角色映射字典 key为role_id
|
||||
@return: 工作空间资源权限列表
|
||||
"""
|
||||
resource_permission_list = [
|
||||
get_workspace_resource_permission_list_by_workspace_user_permission(workspace_user_resource_permission,
|
||||
role_permission_mapping_dict,
|
||||
workspace_user_role_mapping_dict) for
|
||||
workspace_user_resource_permission in workspace_user_resource_permission_list]
|
||||
# 将二维数组扁平为一维
|
||||
return reduce(lambda x, y: [*x, *y], resource_permission_list, [])
|
||||
|
||||
|
||||
def get_workspace_resource_permission_list_by_workspace_user_permission(
|
||||
workspace_user_resource_permission: WorkspaceUserResourcePermission,
|
||||
role_permission_mapping_dict,
|
||||
workspace_user_role_mapping_dict):
|
||||
"""
|
||||
|
||||
@param workspace_user_resource_permission: 工作空间用户资源权限对象
|
||||
@param role_permission_mapping_dict: 角色权限关联字典 key为role_id
|
||||
@param workspace_user_role_mapping_dict: 工作空间用户角色关联字典 key为role_id
|
||||
@return: 工作空间用户资源的权限列表
|
||||
"""
|
||||
|
||||
role_permission_mapping_list = [role_permission_mapping_dict.get(workspace_user_role_mapping.role_id) for
|
||||
workspace_user_role_mapping in
|
||||
workspace_user_role_mapping_dict.get(
|
||||
workspace_user_resource_permission.workspace_id)]
|
||||
role_permission_mapping_list = reduce(lambda x, y: [*x, *y], role_permission_mapping_list, [])
|
||||
# 如果是根据角色
|
||||
if (workspace_user_resource_permission.auth_target_type == ResourceAuthType.ROLE
|
||||
and workspace_user_resource_permission.permission_list.__contains__(
|
||||
ResourcePermissionRole.ROLE)):
|
||||
return [
|
||||
get_workspace_permission(permission_id, workspace_id), get_permission(permission_id)]
|
||||
return [
|
||||
f"{permission_id}:/WORKSPACE/{workspace_id}/{workspace_user_permission.auth_target_type}/{workspace_user_permission.target}"
|
||||
for workspace_user_permission in
|
||||
workspace_user_permission_list if workspace_user_permission.is_auth] + [
|
||||
get_workspace_permission(permission_id, workspace_id), get_permission(permission_id)]
|
||||
f"{role_permission_mapping.permission_id}:/WORKSPACE/{workspace_user_resource_permission.workspace_id}/{workspace_user_resource_permission.auth_target_type}/{workspace_user_resource_permission.target}"
|
||||
for role_permission_mapping in role_permission_mapping_list]
|
||||
|
||||
elif workspace_user_resource_permission.auth_target_type == ResourceAuthType.RESOURCE_PERMISSION_GROUP:
|
||||
resource_permission_list = [
|
||||
[
|
||||
f"{permission}:/WORKSPACE/{workspace_user_resource_permission.workspace_id}/{workspace_user_resource_permission.auth_target_type}/{workspace_user_resource_permission.target}"
|
||||
for permission in get_permission_list_by_resource_group(ResourcePermissionGroup[resource_permission])]
|
||||
for resource_permission in workspace_user_resource_permission.permission_list if
|
||||
ResourcePermissionGroup.values.__contains__(resource_permission)]
|
||||
# 将二维数组扁平为一维
|
||||
return reduce(lambda x, y: [*x, *y], resource_permission_list, [])
|
||||
return []
|
||||
|
||||
|
||||
def get_permission_list(user,
|
||||
|
|
@ -63,41 +135,53 @@ def get_permission_list(user,
|
|||
if is_query_model:
|
||||
# 获取工作空间 用户 角色映射数据
|
||||
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
|
||||
workspace_user_role_mapping_dict = group_by(workspace_user_role_mapping_list,
|
||||
lambda item: item.role_id)
|
||||
# 获取角色权限映射数据
|
||||
role_permission_mapping_list = QuerySet(role_permission_mapping_model).filter(
|
||||
role_id__in=[workspace_user_role_mapping.role_id for workspace_user_role_mapping in
|
||||
workspace_user_role_mapping_list])
|
||||
role_dict = group_by(role_permission_mapping_list, lambda item: item.get('role_id'))
|
||||
role_permission_mapping_dict = group_by(role_permission_mapping_list, lambda item: item.role_id)
|
||||
|
||||
workspace_user_permission_list = QuerySet(WorkspaceUserPermission).filter(
|
||||
workspace_user_permission_list = QuerySet(WorkspaceUserResourcePermission).filter(
|
||||
workspace_id__in=[workspace_user_role.workspace_id for workspace_user_role in
|
||||
workspace_user_role_mapping_list])
|
||||
workspace_user_permission_dict = group_by(workspace_user_permission_list,
|
||||
key=lambda item: item.workspace_id)
|
||||
permission_list = [
|
||||
get_workspace_resource_permission_list(role_permission_mapping.permission_id,
|
||||
role_dict.get(role_permission_mapping.role_id).workspace_id,
|
||||
workspace_user_permission_dict)
|
||||
for role_permission_mapping in
|
||||
role_permission_mapping_list]
|
||||
|
||||
# 将二维数组扁平为一维
|
||||
permission_list = reduce(lambda x, y: [*x, *y], permission_list, [])
|
||||
# 资源权限
|
||||
workspace_resource_permission_list = get_workspace_resource_permission_list(workspace_user_permission_list,
|
||||
role_permission_mapping_dict,
|
||||
workspace_user_role_mapping_dict)
|
||||
|
||||
workspace_permission_list = get_workspace_permission_list(role_permission_mapping_dict,
|
||||
workspace_user_role_mapping_list)
|
||||
# 系统权限
|
||||
system_permission_list = [role_permission_mapping.permission_id for role_permission_mapping in
|
||||
role_permission_mapping_list]
|
||||
# 合并权限
|
||||
permission_list = system_permission_list + workspace_permission_list + workspace_resource_permission_list
|
||||
cache.set(key, permission_list, version=version)
|
||||
else:
|
||||
workspace_id_list = ['default']
|
||||
workspace_user_permission_list = QuerySet(WorkspaceUserPermission).filter(
|
||||
workspace_user_resource_permission_list = QuerySet(WorkspaceUserResourcePermission).filter(
|
||||
workspace_id__in=workspace_id_list)
|
||||
role_permission_mapping_list = get_default_role_permission_mapping_list()
|
||||
role_permission_mapping_dict = group_by(role_permission_mapping_list, lambda item: item.role_id)
|
||||
workspace_user_role_mapping_list = get_default_workspace_user_role_mapping_list([user.role])
|
||||
workspace_user_role_mapping_dict = group_by(workspace_user_role_mapping_list,
|
||||
lambda item: item.role_id)
|
||||
# 资源权限
|
||||
workspace_resource_permission_list = get_workspace_resource_permission_list(
|
||||
workspace_user_resource_permission_list,
|
||||
role_permission_mapping_dict,
|
||||
workspace_user_role_mapping_dict)
|
||||
|
||||
workspace_user_permission_dict = group_by(workspace_user_permission_list,
|
||||
key=lambda item: item.workspace_id)
|
||||
permission_list = get_default_permission_list_by_role(RoleConstants[user.role])
|
||||
permission_list = [
|
||||
get_workspace_resource_permission_list(permission, 'default', workspace_user_permission_dict) for
|
||||
permission
|
||||
in permission_list]
|
||||
# 将二维数组扁平为一维
|
||||
permission_list = reduce(lambda x, y: [*x, *y], permission_list, [])
|
||||
workspace_permission_list = get_workspace_permission_list(role_permission_mapping_dict,
|
||||
workspace_user_role_mapping_list)
|
||||
# 系统权限
|
||||
system_permission_list = [role_permission_mapping.permission_id for role_permission_mapping in
|
||||
role_permission_mapping_list]
|
||||
# 合并权限
|
||||
permission_list = system_permission_list + workspace_permission_list + workspace_resource_permission_list
|
||||
cache.set(key, permission_list, version=version)
|
||||
return permission_list
|
||||
|
||||
|
|
|
|||
|
|
@ -6,8 +6,11 @@
|
|||
@desc: 权限,角色 常量
|
||||
"""
|
||||
from enum import Enum
|
||||
from functools import reduce
|
||||
from typing import List
|
||||
|
||||
from django.db import models
|
||||
|
||||
|
||||
class Group(Enum):
|
||||
"""
|
||||
|
|
@ -45,6 +48,40 @@ class RoleGroup(Enum):
|
|||
CHAT_USER = "CHAT_USER"
|
||||
|
||||
|
||||
class ResourcePermissionRole(models.TextChoices):
|
||||
"""
|
||||
资源权限根据角色
|
||||
"""
|
||||
ROLE = "ROLE"
|
||||
|
||||
def __eq__(self, other):
|
||||
return str(self) == str(other)
|
||||
|
||||
|
||||
class ResourcePermissionGroup(models.TextChoices):
|
||||
"""
|
||||
资源权限组
|
||||
"""
|
||||
# 查看
|
||||
VIEW = "VIEW"
|
||||
# 管理
|
||||
MANAGE = "MANAGE"
|
||||
|
||||
def __eq__(self, other):
|
||||
return str(self) == str(other)
|
||||
|
||||
|
||||
class ResourceAuthType(models.TextChoices):
|
||||
"""
|
||||
资源授权类型
|
||||
"""
|
||||
"当授权类型是Role时候"
|
||||
ROLE = "ROLE"
|
||||
|
||||
"""资源权限组"""
|
||||
RESOURCE_PERMISSION_GROUP = "RESOURCE_PERMISSION_GROUP"
|
||||
|
||||
|
||||
class Role:
|
||||
def __init__(self, name: str, decs: str, group: RoleGroup, resource_path=None):
|
||||
self.name = name
|
||||
|
|
@ -78,14 +115,19 @@ class Permission:
|
|||
权限信息
|
||||
"""
|
||||
|
||||
def __init__(self, group: Group, operate: Operate, resource_path=None, role_list=None):
|
||||
def __init__(self, group: Group, operate: Operate, resource_path=None, role_list=None,
|
||||
resource_permission_group_list=None):
|
||||
if role_list is None:
|
||||
role_list = []
|
||||
if resource_permission_group_list is None:
|
||||
resource_permission_group_list = []
|
||||
self.group = group
|
||||
self.operate = operate
|
||||
self.resource_path = resource_path
|
||||
# 用于获取角色与权限的关系,只适用于没有权限管理的
|
||||
self.role_list = role_list
|
||||
# 用于资源权限权限分组
|
||||
self.resource_permission_group_list = resource_permission_group_list
|
||||
|
||||
@staticmethod
|
||||
def new_instance(permission_str: str):
|
||||
|
|
@ -151,13 +193,28 @@ class PermissionConstants(Enum):
|
|||
KNOWLEDGE_MODULE_CREATE = Permission(group=Group.KNOWLEDGE, operate=Operate.CREATE, role_list=[RoleConstants.ADMIN,
|
||||
RoleConstants.USER])
|
||||
KNOWLEDGE_MODULE_READ = Permission(group=Group.KNOWLEDGE, operate=Operate.READ, role_list=[RoleConstants.ADMIN,
|
||||
RoleConstants.USER])
|
||||
RoleConstants.USER],
|
||||
resource_permission_group_list=[
|
||||
ResourcePermissionGroup.VIEW
|
||||
])
|
||||
KNOWLEDGE_MODULE_EDIT = Permission(group=Group.KNOWLEDGE, operate=Operate.EDIT, role_list=[RoleConstants.ADMIN,
|
||||
RoleConstants.USER])
|
||||
RoleConstants.USER],
|
||||
resource_permission_group_list=[
|
||||
ResourcePermissionGroup.MANAGE
|
||||
]
|
||||
)
|
||||
KNOWLEDGE_MODULE_DELETE = Permission(group=Group.KNOWLEDGE, operate=Operate.DELETE, role_list=[RoleConstants.ADMIN,
|
||||
RoleConstants.USER])
|
||||
RoleConstants.USER],
|
||||
resource_permission_group_list=[
|
||||
ResourcePermissionGroup.MANAGE
|
||||
]
|
||||
)
|
||||
KNOWLEDGE_READ = Permission(group=Group.KNOWLEDGE, operate=Operate.READ, role_list=[RoleConstants.ADMIN,
|
||||
RoleConstants.USER])
|
||||
RoleConstants.USER],
|
||||
resource_permission_group_list=[
|
||||
ResourcePermissionGroup.VIEW
|
||||
]
|
||||
)
|
||||
KNOWLEDGE_CREATE = Permission(group=Group.KNOWLEDGE, operate=Operate.CREATE, role_list=[RoleConstants.ADMIN,
|
||||
RoleConstants.USER])
|
||||
|
||||
|
|
@ -194,6 +251,39 @@ def get_default_permission_list_by_role(role: RoleConstants):
|
|||
PermissionConstants.__members__))))
|
||||
|
||||
|
||||
class RolePermissionMapping:
|
||||
def __init__(self, role_id, permission_id):
|
||||
self.role_id = role_id
|
||||
self.permission_id = permission_id
|
||||
|
||||
|
||||
class WorkspaceUserRoleMapping:
|
||||
def __init__(self, workspace_id, role_id, user_id):
|
||||
self.workspace_id = workspace_id
|
||||
self.role_id = role_id
|
||||
self.user_id = user_id
|
||||
|
||||
|
||||
def get_default_role_permission_mapping_list():
|
||||
role_permission_mapping_list = [
|
||||
[RolePermissionMapping(role.value.name, PermissionConstants[k].value.__str__()) for role in
|
||||
PermissionConstants[k].value.role_list] for k in PermissionConstants.__members__]
|
||||
return reduce(lambda x, y: [*x, *y], role_permission_mapping_list, [])
|
||||
|
||||
|
||||
def get_default_workspace_user_role_mapping_list(user_role_list: list):
|
||||
return [WorkspaceUserRoleMapping('default', role.value.name, 'default') for role in RoleConstants if
|
||||
user_role_list.__contains__(role.value.name)]
|
||||
|
||||
|
||||
def get_permission_list_by_resource_group(resource_group: ResourcePermissionGroup):
|
||||
"""
|
||||
根据资源组获取权限
|
||||
"""
|
||||
return [PermissionConstants[k] for k in PermissionConstants.__members__ if
|
||||
PermissionConstants[k].value.resource_permission_group_list.__contains__(resource_group)]
|
||||
|
||||
|
||||
class Auth:
|
||||
"""
|
||||
用于存储当前用户的角色和权限
|
||||
|
|
|
|||
|
|
@ -0,0 +1,37 @@
|
|||
# Generated by Django 5.2 on 2025-04-27 10:09
|
||||
|
||||
import django.contrib.postgres.fields
|
||||
import django.db.models.deletion
|
||||
import uuid_utils.compat
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('system_manage', '0002_systemsetting'),
|
||||
('users', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='WorkspaceUserResourcePermission',
|
||||
fields=[
|
||||
('id', models.UUIDField(default=uuid_utils.compat.uuid7, editable=False, primary_key=True, serialize=False, verbose_name='主键id')),
|
||||
('workspace_id', models.CharField(default='default', max_length=128, verbose_name='工作空间id')),
|
||||
('auth_target_type', models.CharField(choices=[('KNOWLEDGE', '知识库'), ('APPLICATION', '应用')], default='KNOWLEDGE', max_length=128, verbose_name='授权目标')),
|
||||
('target', models.UUIDField(verbose_name='知识库/应用id')),
|
||||
('auth_type', models.CharField(choices=[('ROLE', 'Role'), ('RESOURCE_PERMISSION_GROUP', 'Resource Permission Group')], db_default='ROLE', default=False, verbose_name='授权类型')),
|
||||
('permission_list', django.contrib.postgres.fields.ArrayField(base_field=models.CharField(blank=True, choices=[('VIEW', 'View'), ('MANAGE', 'Manage'), ('ROLE', 'Role')], default='VIEW', max_length=256), default=list, size=None, verbose_name='权限列表')),
|
||||
('create_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
|
||||
('update_time', models.DateTimeField(auto_now=True, verbose_name='修改时间')),
|
||||
('user', models.ForeignKey(on_delete=django.db.models.deletion.DO_NOTHING, to='users.user', verbose_name='工作空间下的用户')),
|
||||
],
|
||||
options={
|
||||
'db_table': 'workspace_user_resource_permission',
|
||||
},
|
||||
),
|
||||
migrations.DeleteModel(
|
||||
name='WorkspaceUserPermission',
|
||||
),
|
||||
]
|
||||
|
|
@ -8,9 +8,11 @@
|
|||
"""
|
||||
|
||||
import uuid_utils.compat as uuid
|
||||
from django.contrib.postgres.fields import ArrayField
|
||||
from django.db import models
|
||||
|
||||
from common.constants.permission_constants import Group
|
||||
from common.constants.permission_constants import Group, ResourcePermissionGroup, ResourceAuthType, \
|
||||
ResourcePermissionRole
|
||||
from users.models import User
|
||||
|
||||
|
||||
|
|
@ -20,7 +22,7 @@ class AuthTargetType(models.TextChoices):
|
|||
APPLICATION = Group.APPLICATION.value, '应用'
|
||||
|
||||
|
||||
class WorkspaceUserPermission(models.Model):
|
||||
class WorkspaceUserResourcePermission(models.Model):
|
||||
"""
|
||||
工作空间用户资源权限表
|
||||
用于管理当前工作空间是否有权限操作 某一个应用或者知识库
|
||||
|
|
@ -36,12 +38,20 @@ class WorkspaceUserPermission(models.Model):
|
|||
# 授权的知识库或者应用的id
|
||||
target = models.UUIDField(max_length=128, verbose_name="知识库/应用id")
|
||||
|
||||
# 是否授权
|
||||
is_auth = models.BooleanField(default=False, verbose_name="是否授权")
|
||||
# 授权类型 如果是Role那么就是角色的权限 如果是PERMISSION
|
||||
auth_type = models.CharField(default=False, verbose_name="授权类型", choices=ResourceAuthType.choices,
|
||||
db_default=ResourceAuthType.ROLE)
|
||||
# 资源权限列表
|
||||
permission_list = ArrayField(verbose_name="权限列表",
|
||||
default=list,
|
||||
base_field=models.CharField(max_length=256,
|
||||
blank=True,
|
||||
choices=ResourcePermissionGroup.choices + ResourcePermissionRole.choices,
|
||||
default=ResourcePermissionGroup.VIEW))
|
||||
|
||||
create_time = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
|
||||
|
||||
update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True)
|
||||
|
||||
class Meta:
|
||||
db_table = "workspace_user_permission"
|
||||
db_table = "workspace_user_resource_permission"
|
||||
|
|
|
|||
Loading…
Reference in New Issue