MaxKB/apps/setting/serializers/team_serializers.py

343 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
@project: qabot
@Author
@file team_serializers.py
@date2023/9/5 16:32
@desc:
"""
import itertools
import json
import os
import uuid
from typing import Dict, List
from django.core import cache
from django.db import transaction
from django.db.models import QuerySet, Q
from drf_yasg import openapi
from rest_framework import serializers
from common.constants.permission_constants import Operate
from common.db.sql_execute import select_list
from common.exception.app_exception import AppApiException
from common.mixins.api_mixin import ApiMixin
from common.response.result import get_api_response
from common.util.field_message import ErrMessage
from common.util.file_util import get_file_content
from setting.models import TeamMember, TeamMemberPermission, Team
from smartdoc.conf import PROJECT_DIR
from users.models.user import User
from users.serializers.user_serializers import UserSerializer
from django.utils.translation import gettext_lazy as _
user_cache = cache.caches['user_cache']
def get_response_body_api():
return openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['id', 'username', 'email', 'role', 'is_active', 'team_id', 'member_id'],
properties={
'id': openapi.Schema(type=openapi.TYPE_STRING, title=_('user id'), description=_('user id')),
'username': openapi.Schema(type=openapi.TYPE_STRING, title=_('Username'), description=_('Username')),
'email': openapi.Schema(type=openapi.TYPE_STRING, title=_('Email'), description=_('Email')),
'role': openapi.Schema(type=openapi.TYPE_STRING, title=_('Role'), description=_('Role')),
'is_active': openapi.Schema(type=openapi.TYPE_STRING, title=_('Is active'), description=_('Is active')),
'team_id': openapi.Schema(type=openapi.TYPE_STRING, title=_('team id'), description=_('team id')),
'member_id': openapi.Schema(type=openapi.TYPE_STRING, title=_('member id'), description=_('member id')),
}
)
class TeamMemberPermissionOperate(ApiMixin, serializers.Serializer):
USE = serializers.BooleanField(required=True, error_messages=ErrMessage.boolean(_('use')))
MANAGE = serializers.BooleanField(required=True, error_messages=ErrMessage.boolean(_('manage')))
def get_request_body_api(self):
return openapi.Schema(type=openapi.TYPE_OBJECT,
title=_('type'),
description=_('Operation permissions USE, MANAGE permissions'),
properties={
'USE': openapi.Schema(type=openapi.TYPE_BOOLEAN,
title=_('use permission'),
description=_('use permission True|False')),
'MANAGE': openapi.Schema(type=openapi.TYPE_BOOLEAN,
title=_('manage permission'),
description=_('manage permission True|False'))
}
)
class UpdateTeamMemberItemPermissionSerializer(ApiMixin, serializers.Serializer):
target_id = serializers.CharField(required=True, error_messages=ErrMessage.char(_('target id')))
type = serializers.CharField(required=True, error_messages=ErrMessage.char(_('type')))
operate = TeamMemberPermissionOperate(required=True, many=False)
def get_request_body_api(self):
return openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['id', 'type', 'operate'],
properties={
'target_id': openapi.Schema(type=openapi.TYPE_STRING, title=_('dataset id/application id'),
description=_('dataset id/application id')),
'type': openapi.Schema(type=openapi.TYPE_STRING,
title=_('type'),
description="DATASET|APPLICATION",
),
'operate': TeamMemberPermissionOperate().get_request_body_api()
}
)
class UpdateTeamMemberPermissionSerializer(ApiMixin, serializers.Serializer):
team_member_permission_list = UpdateTeamMemberItemPermissionSerializer(required=True, many=True)
def is_valid(self, *, user_id=None):
super().is_valid(raise_exception=True)
permission_list = self.data.get("team_member_permission_list")
illegal_target_id_list = select_list(
get_file_content(
os.path.join(PROJECT_DIR, "apps", "setting", 'sql', 'check_member_permission_target_exists.sql')),
[json.dumps(permission_list), user_id, user_id])
if illegal_target_id_list is not None and len(illegal_target_id_list) > 0:
raise AppApiException(500,
_('Non-existent application|knowledge base id[') + str(illegal_target_id_list) + ']')
def update_or_save(self, member_id: str):
team_member_permission_list = self.data.get("team_member_permission_list")
# 获取数据库已有权限 从而判断是否是插入还是更新
team_member_permission_exist_list = QuerySet(TeamMemberPermission).filter(
member_id=member_id)
update_list = []
save_list = []
for item in team_member_permission_list:
exist_list = list(
filter(lambda use: str(use.target) == item.get('target_id'), team_member_permission_exist_list))
if len(exist_list) > 0:
exist_list[0].operate = list(
filter(lambda key: item.get('operate').get(key),
item.get('operate').keys()))
update_list.append(exist_list[0])
else:
save_list.append(TeamMemberPermission(target=item.get('target_id'), auth_target_type=item.get('type'),
operate=list(
filter(lambda key: item.get('operate').get(key),
item.get('operate').keys())),
member_id=member_id))
# 批量更新
QuerySet(TeamMemberPermission).bulk_update(update_list, ['operate'])
# 批量插入
QuerySet(TeamMemberPermission).bulk_create(save_list)
def get_request_body_api(self):
return openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['id'],
properties={
'team_member_permission_list':
openapi.Schema(type=openapi.TYPE_ARRAY, title=_('Permission data'),
description=_('Permission data'),
items=UpdateTeamMemberItemPermissionSerializer().get_request_body_api()
),
}
)
class TeamMemberSerializer(ApiMixin, serializers.Serializer):
team_id = serializers.UUIDField(required=True, error_messages=ErrMessage.uuid(_('team id')))
def is_valid(self, *, raise_exception=False):
super().is_valid(raise_exception=True)
@staticmethod
def get_bach_request_body_api():
return openapi.Schema(
type=openapi.TYPE_ARRAY,
title=_('user id list'),
description=_('user id list'),
items=openapi.Schema(type=openapi.TYPE_STRING)
)
@staticmethod
def get_request_body_api():
return openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['username_or_email'],
properties={
'username_or_email': openapi.Schema(type=openapi.TYPE_STRING, title=_('Username or email'),
description=_('Username or email')),
}
)
@staticmethod
def get_response_body_api():
return openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'id': openapi.Schema(type=openapi.TYPE_STRING, title=_('user id'), description=_('user id')),
'username': openapi.Schema(type=openapi.TYPE_STRING, title=_('Username'), description=_('Username')),
'email': openapi.Schema(type=openapi.TYPE_STRING, title=_('Email'), description=_('Email')),
'role': openapi.Schema(type=openapi.TYPE_STRING, title=_('Role'), description=_('Role')),
'is_active': openapi.Schema(type=openapi.TYPE_STRING, title=_('Is active'),
description=_('Is active')),
'team_id': openapi.Schema(type=openapi.TYPE_STRING, title=_('team id'), description=_('team id')),
'user_id': openapi.Schema(type=openapi.TYPE_STRING, title=_('user id'), description=_('user id')),
'type': openapi.Schema(type=openapi.TYPE_STRING, title=_('member type'),
description=_('member type manage|member')),
}
)
@transaction.atomic
def batch_add_member(self, user_id_list: List[str], with_valid=True):
"""
批量添加成员
:param user_id_list: 用户id列表
:param with_valid: 是否校验
:return: 成员列表
"""
if with_valid:
self.is_valid(raise_exception=True)
use_user_id_list = [str(u.id) for u in QuerySet(User).filter(id__in=user_id_list)]
team_member_user_id_list = [str(team_member.user_id) for team_member in
QuerySet(TeamMember).filter(team_id=self.data.get('team_id'))]
team_id = self.data.get("team_id")
create_team_member_list = [
self.to_member_model(add_user_id, team_member_user_id_list, use_user_id_list, team_id) for add_user_id in
user_id_list]
QuerySet(TeamMember).bulk_create(
[team_member for team_member in create_team_member_list if team_member is not None]) if len(
create_team_member_list) > 0 else None
return TeamMemberSerializer(
data={'team_id': self.data.get("team_id")}).list_member()
def to_member_model(self, add_user_id, team_member_user_id_list, use_user_id_list, user_id):
if use_user_id_list.__contains__(add_user_id):
if team_member_user_id_list.__contains__(add_user_id) or user_id == add_user_id:
return None
else:
return TeamMember(team_id=self.data.get("team_id"), user_id=add_user_id)
else:
return None
def add_member(self, username_or_email: str, with_valid=True):
"""
添加一个成员
:param with_valid: 是否校驗參數
:param username_or_email: 添加成员的邮箱或者用户名
:return: 成员列表
"""
if with_valid:
self.is_valid(raise_exception=True)
if username_or_email is None:
raise AppApiException(500, _('Username or email is required'))
user = QuerySet(User).filter(
Q(username=username_or_email) | Q(email=username_or_email)).first()
if user is None:
raise AppApiException(500, _('User does not exist'))
if QuerySet(TeamMember).filter(Q(team_id=self.data.get('team_id')) & Q(user=user)).exists() or self.data.get(
"team_id") == str(user.id):
raise AppApiException(500, _('The current members already exist in the team, do not add them again.'))
TeamMember(team_id=self.data.get("team_id"), user=user).save()
return self.list_member(with_valid=False)
def list_member(self, with_valid=True):
"""
获取 团队中的成员列表
:return: 成员列表
"""
if with_valid:
self.is_valid(raise_exception=True)
# 普通成員列表
member_list = list(map(lambda t: {"id": t.id, 'email': t.user.email, 'username': t.user.username,
'team_id': self.data.get("team_id"), 'user_id': t.user.id,
'type': 'member'},
QuerySet(TeamMember).filter(team_id=self.data.get("team_id"))))
# 管理員成員
manage_member = QuerySet(User).get(id=self.data.get('team_id'))
return [{'id': 'root', 'email': manage_member.email, 'username': manage_member.username,
'team_id': self.data.get("team_id"), 'user_id': manage_member.id, 'type': 'manage'
}, *member_list]
def get_response_body_api(self):
return get_api_response(openapi.Schema(
type=openapi.TYPE_ARRAY, title=_('member list'), description=_('member list'),
items=UserSerializer().get_response_body_api()
))
class Operate(ApiMixin, serializers.Serializer):
# 团队 成员id
member_id = serializers.CharField(required=True, error_messages=ErrMessage.char(_('member id')))
# 团队id
team_id = serializers.CharField(required=True, error_messages=ErrMessage.char(_('team id')))
def is_valid(self, *, raise_exception=True):
super().is_valid(raise_exception=True)
if self.data.get('member_id') != 'root' and not QuerySet(TeamMember).filter(
team_id=self.data.get('team_id'),
id=self.data.get('member_id')).exists():
raise AppApiException(500, _('The member does not exist, please add a member first'))
return True
def list_member_permission(self, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
team_id = self.data.get('team_id')
member_id = self.data.get("member_id")
# 查询当前团队成员所有的知识库和应用的权限 注意 operate为null是为设置权限 默认值都是false
member_permission_list = select_list(
get_file_content(os.path.join(PROJECT_DIR, "apps", "setting", 'sql', 'get_member_permission.sql')),
[team_id, team_id, (member_id if member_id != 'root' else uuid.uuid1())])
# 如果是管理员 则拥有所有权限 默认赋值
if member_id == 'root':
member_permission_list = list(
map(lambda row: {**row, 'operate': {Operate.USE.value: True, Operate.MANAGE.value: True}},
member_permission_list))
# 分为 APPLICATION DATASET俩组
groups = itertools.groupby(
sorted(list(map(lambda m: {**m, 'member_id': member_id,
'operate': dict(
map(lambda key: (key, True if m.get('operate') is not None and m.get(
'operate').__contains__(key) else False),
[Operate.USE.value, Operate.MANAGE.value]))},
member_permission_list)), key=lambda x: x.get('type')),
key=lambda x: x.get('type'))
return dict([(key, list(group)) for key, group in groups])
def edit(self, member_permission: Dict):
self.is_valid(raise_exception=True)
member_id = self.data.get("member_id")
if member_id == 'root':
raise AppApiException(500, _('Administrator rights do not allow modification'))
s = UpdateTeamMemberPermissionSerializer(data=member_permission)
s.is_valid(user_id=self.data.get("team_id"))
s.update_or_save(member_id)
return self.list_member_permission(with_valid=False)
def delete(self):
"""
移除成员
:return:
"""
self.is_valid(raise_exception=True)
member_id = self.data.get("member_id")
if member_id == 'root':
raise AppApiException(500, _('Unable to remove team admin'))
# 删除成员权限
QuerySet(TeamMemberPermission).filter(member_id=member_id).delete()
# 删除成员
QuerySet(TeamMember).filter(id=member_id).delete()
return True
@staticmethod
def get_request_params_api():
return [openapi.Parameter(name='member_id',
in_=openapi.IN_PATH,
type=openapi.TYPE_STRING,
required=True,
description=_('member id')), ]