feat: Access restrictions (#3222)

This commit is contained in:
shaohuzhang1 2025-06-09 18:40:56 +08:00 committed by GitHub
parent fff46e5c28
commit 5da47a140d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 200 additions and 2 deletions

View File

@ -0,0 +1,35 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file application_access_token.py
@date2025/6/9 17:46
@desc:
"""
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter
from application.serializers.application_access_token import AccessTokenEditSerializer
from common.mixins.api_mixin import APIMixin
class ApplicationAccessTokenAPI(APIMixin):
@staticmethod
def get_parameters():
return [OpenApiParameter(
name="workspace_id",
description="工作空间id",
type=OpenApiTypes.STR,
location='path',
required=True,
), OpenApiParameter(
name="application_id",
description="应用id",
type=OpenApiTypes.STR,
location='path',
required=True,
)]
@staticmethod
def get_request():
return AccessTokenEditSerializer

View File

@ -0,0 +1,111 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file application_access_token.py
@date2025/6/9 17:49
@desc:
"""
import hashlib
import uuid
from django.core.cache import cache
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from application.models import ApplicationAccessToken
from common.constants.cache_version import Cache_Version
from common.database_model_manage.database_model_manage import DatabaseModelManage
class AccessTokenEditSerializer(serializers.Serializer):
access_token_reset = serializers.BooleanField(required=False,
label=_("Reset Token"))
is_active = serializers.BooleanField(required=False, label=_("Is it enabled"))
access_num = serializers.IntegerField(required=False, max_value=10000,
min_value=0,
label=_("Number of visits"))
white_active = serializers.BooleanField(required=False,
label=_("Whether to enable whitelist"))
white_list = serializers.ListSerializer(required=False, child=serializers.CharField(required=True,
label=_("Whitelist")),
label=_("Whitelist")),
show_source = serializers.BooleanField(required=False,
label=_("Whether to display knowledge sources"))
language = serializers.CharField(required=False, allow_blank=True, allow_null=True,
label=_("language"))
authentication = serializers.BooleanField(default=False, label="Do you need authentication")
authentication_value = serializers.JSONField(required=False, label="Certified value", default=dict)
class AccessTokenSerializer(serializers.Serializer):
application_id = serializers.UUIDField(required=True, label=_("Application ID"))
def edit(self, instance):
self.is_valid(raise_exception=True)
AccessTokenEditSerializer(data=instance).is_valid(raise_exception=True)
application_access_token = QuerySet(ApplicationAccessToken).get(
application_id=self.data.get('application_id'))
if 'is_active' in instance:
application_access_token.is_active = instance.get("is_active")
if 'access_token_reset' in instance and instance.get('access_token_reset'):
application_access_token.access_token = hashlib.md5(str(uuid.uuid1()).encode()).hexdigest()[8:24]
if 'access_num' in instance and instance.get('access_num') is not None:
application_access_token.access_num = instance.get("access_num")
if 'white_active' in instance and instance.get('white_active') is not None:
application_access_token.white_active = instance.get("white_active")
if 'white_list' in instance and instance.get('white_list') is not None:
application_access_token.white_list = instance.get('white_list')
if 'show_source' in instance and instance.get('show_source') is not None:
application_access_token.show_source = instance.get('show_source')
if 'language' in instance and instance.get('language') is not None:
application_access_token.language = instance.get('language')
if 'language' not in instance or instance.get('language') is None:
application_access_token.language = None
application_access_token.save()
application_setting_model = DatabaseModelManage.get_model('application_setting')
license_is_valid = cache.get(Cache_Version.SYSTEM.get_key(key='license_is_valid'),
version=Cache_Version.SYSTEM.get_version())
if application_setting_model is not None and license_is_valid:
application_setting, _ = application_setting_model.objects.get_or_create(
application_id=self.data.get('application_id'))
if application_setting is not None and instance.get('authentication') is not None and instance.get(
'authentication_value') is not None:
application_setting.authentication = instance.get('authentication')
application_setting.authentication_value = instance.get('authentication_value')
application_setting.save()
return self.one(with_valid=False)
def one(self, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
application_id = self.data.get("application_id")
application_access_token = QuerySet(ApplicationAccessToken).filter(
application_id=application_id).first()
if application_access_token is None:
application_access_token = ApplicationAccessToken(application_id=application_id,
access_token=hashlib.md5(
str(uuid.uuid1()).encode()).hexdigest()[
8:24], is_active=True)
application_access_token.save()
application_setting_model = DatabaseModelManage.get_model('application_setting')
other = {}
if application_setting_model is not None:
application_setting, _ = application_setting_model.objects.get_or_create(
application_id=self.data.get('application_id'))
if application_setting is not None:
other = {'authentication': application_setting.authentication,
'authentication_value': application_setting.authentication_value}
return {'application_id': application_access_token.application_id,
'access_token': application_access_token.access_token,
"is_active": application_access_token.is_active,
'access_num': application_access_token.access_num,
'white_active': application_access_token.white_active,
'white_list': application_access_token.white_list,
'show_source': application_access_token.show_source,
'language': application_access_token.language,
**other,
}

View File

@ -17,6 +17,8 @@ urlpatterns = [
path('workspace/<str:workspace_id>/application/<str:application_id>/work_flow_version',
views.ApplicationVersionView.as_view()),
path('workspace/<str:workspace_id>/application/<str:application_id>/access_token',
views.AccessToken.as_view()),
path(
'workspace/<str:workspace_id>/application/<str:application_id>/work_flow_version/<int:current_page>/<int:page_size>',
views.ApplicationVersionView.Page.as_view()),

View File

@ -9,3 +9,4 @@
from .application_api_key import *
from .application import *
from .application_version import *
from .application_access_token import *

View File

@ -0,0 +1,50 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file application_token.py
@date2025/6/9 17:42
@desc:
"""
from django.utils.translation import gettext_lazy as _
from drf_spectacular.utils import extend_schema
from rest_framework.request import Request
from rest_framework.views import APIView
from application.api.application_access_token import ApplicationAccessTokenAPI
from application.serializers.application_access_token import AccessTokenSerializer
from common import result
from common.auth import TokenAuth
from common.auth.authentication import has_permissions
from common.constants.permission_constants import PermissionConstants
class AccessToken(APIView):
authentication_classes = [TokenAuth]
@extend_schema(
methods=['PUT'],
description=_("Modify application access restriction information"),
summary=_("Modify application access restriction information"),
operation_id=_("Modify application access restriction information"), # type: ignore
parameters=ApplicationAccessTokenAPI.get_parameters(),
request=ApplicationAccessTokenAPI.get_request(),
tags=[_('Application')] # type: ignore
)
@has_permissions(PermissionConstants.APPLICATION_OVERVIEW_ACCESS.get_workspace_permission())
def put(self, request: Request, workspace_id: str, application_id: str):
return result.success(
AccessTokenSerializer(data={'application_id': application_id}).edit(
request.data))
@extend_schema(
methods=['GET'],
description=_("Get application access restriction information"),
summary=_("Get application access restriction information"),
operation_id=_("Get application access restriction information"), # type: ignore
parameters=ApplicationAccessTokenAPI.get_parameters(),
tags=[_('Application')] # type: ignore
)
@has_permissions(PermissionConstants.APPLICATION_READ.get_workspace_permission())
def get(self, request: Request, workspace_id: str, application_id: str):
return result.success(AccessTokenSerializer(data={'application_id': application_id}).one())

View File

@ -34,7 +34,6 @@ class ApplicationKey(APIView):
parameters=ApplicationKeyCreateAPI.get_parameters(),
tags=[_('Application Api Key')] # type: ignore
)
@log(menu='Application', operate="Add ApiKey",
get_operation_object=lambda r, k: get_application_operation_object(k.get('application_api_key_id')))
@has_permissions(PermissionConstants.APPLICATION_OVERVIEW_API_KEY.get_workspace_application_permission())
@ -52,7 +51,7 @@ class ApplicationKey(APIView):
tags=[_('Application Api Key')] # type: ignore
)
@has_permissions(PermissionConstants.APPLICATION_OVERVIEW_API_KEY.get_workspace_application_permission())
def get(self, request: Request, workspace_id: str, application_id: str ):
def get(self, request: Request, workspace_id: str, application_id: str):
return result, success(ApplicationKeySerializer(
data={'application_id': application_id, 'user_id': request.user.id,
'workspace_id': workspace_id}).list())