feat: Apikey call supports cross domain and application whitelist (#3556)

This commit is contained in:
shaohuzhang1 2025-07-11 11:15:21 +08:00 committed by GitHub
parent c3ecddcd1b
commit 210e09681f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 203 additions and 0 deletions

View File

@ -0,0 +1,30 @@
# coding=utf-8
"""
@project: MaxKB
@Author
@file application_access_token_cache.py
@date2024/7/25 11:34
@desc:
"""
from django.core.cache import cache
from django.db.models import QuerySet
from application.models import ApplicationAccessToken
from common.utils.cache_util import get_cache
@get_cache(cache_key=lambda access_token, use_get_data: access_token,
use_get_data=lambda access_token, use_get_data: use_get_data,
version='APPLICATION_ACCESS_TOKEN_CACHE')
def get_application_access_token(access_token, use_get_data):
application_access_token = QuerySet(ApplicationAccessToken).filter(access_token=access_token).first()
if application_access_token is None:
return None
return {'white_active': application_access_token.white_active,
'white_list': application_access_token.white_list,
'application_icon': application_access_token.application.icon,
'application_name': application_access_token.application.name}
def del_application_access_token(access_token):
cache.delete(access_token, version='APPLICATION_ACCESS_TOKEN_CACHE')

View File

@ -0,0 +1,8 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file __init__.py
@date2025/7/11 10:43
@desc:
"""

View File

@ -0,0 +1,38 @@
# coding=utf-8
"""
@project: maxkb
@Author
@file static_headers_middleware.py
@date2024/3/13 18:26
@desc:
"""
from django.utils.deprecation import MiddlewareMixin
from common.cache_data.application_access_token_cache import get_application_access_token
from maxkb.const import CONFIG
class ChatHeadersMiddleware(MiddlewareMixin):
def process_response(self, request, response):
if request.path.startswith(CONFIG.get_chat_path()) and not request.path.startswith(
CONFIG.get_chat_path() + '/api'):
access_token = request.path.replace(CONFIG.get_chat_path() + '/', '')
if access_token.__contains__('/') or access_token == 'undefined':
return response
application_access_token = get_application_access_token(access_token, True)
if application_access_token is not None:
white_active = application_access_token.get('white_active', False)
white_list = application_access_token.get('white_list', [])
application_icon = application_access_token.get('application_icon')
application_name = application_access_token.get('application_name')
if white_active:
# 添加自定义的响应头
response[
'Content-Security-Policy'] = f'frame-ancestors {" ".join(white_list)}'
response.content = (response.content.decode('utf-8').replace(
'<link rel="icon" href="./favicon.ico"/>',
f'<link rel="icon" href="{application_icon}" />')
.replace('<title>MaxKB</title>', f'<title>{application_name}</title>').encode(
"utf-8"))
return response

View File

@ -0,0 +1,40 @@
# coding=utf-8
"""
@project: MaxKB
@Author虎虎
@file cross_domain_middleware.py
@date2024/5/8 13:36
@desc:
"""
from django.http import HttpResponse
from django.utils.deprecation import MiddlewareMixin
from common.cache_data.application_api_key_cache import get_application_api_key
class CrossDomainMiddleware(MiddlewareMixin):
def process_request(self, request):
if request.method == 'OPTIONS':
return HttpResponse(status=200,
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET,POST,DELETE,PUT",
"Access-Control-Allow-Headers": "Origin,X-Requested-With,Content-Type,Accept,Authorization,token"})
def process_response(self, request, response):
auth = request.META.get('HTTP_AUTHORIZATION')
origin = request.META.get('HTTP_ORIGIN')
if auth is not None and str(auth).startswith("application-") and origin is not None:
application_api_key = get_application_api_key(str(auth), True)
cross_domain_list = application_api_key.get('cross_domain_list', [])
allow_cross_domain = application_api_key.get('allow_cross_domain', False)
if allow_cross_domain:
response['Access-Control-Allow-Methods'] = 'GET,POST,DELETE,PUT'
response[
'Access-Control-Allow-Headers'] = "Origin,X-Requested-With,Content-Type,Accept,Authorization,token"
if cross_domain_list is None or len(cross_domain_list) == 0:
response['Access-Control-Allow-Origin'] = "*"
elif cross_domain_list.__contains__(origin):
response['Access-Control-Allow-Origin'] = origin
return response

View File

@ -0,0 +1,84 @@
# coding=utf-8
"""
@project: MaxKB
@Author
@file gzip.py
@date2025/2/27 10:03
@desc:
"""
from django.utils.cache import patch_vary_headers
from django.utils.deprecation import MiddlewareMixin
from django.utils.regex_helper import _lazy_re_compile
from django.utils.text import compress_sequence, compress_string
re_accepts_gzip = _lazy_re_compile(r"\bgzip\b")
class GZipMiddleware(MiddlewareMixin):
"""
Compress content if the browser allows gzip compression.
Set the Vary header accordingly, so that caches will base their storage
on the Accept-Encoding header.
"""
max_random_bytes = 100
def process_response(self, request, response):
if request.method != 'GET' or request.path.startswith('/api'):
return response
# It's not worth attempting to compress really short responses.
if not response.streaming and len(response.content) < 200:
return response
# Avoid gzipping if we've already got a content-encoding.
if response.has_header("Content-Encoding"):
return response
patch_vary_headers(response, ("Accept-Encoding",))
ae = request.META.get("HTTP_ACCEPT_ENCODING", "")
if not re_accepts_gzip.search(ae):
return response
if response.streaming:
if response.is_async:
# pull to lexical scope to capture fixed reference in case
# streaming_content is set again later.
original_iterator = response.streaming_content
async def gzip_wrapper():
async for chunk in original_iterator:
yield compress_string(
chunk,
max_random_bytes=self.max_random_bytes,
)
response.streaming_content = gzip_wrapper()
else:
response.streaming_content = compress_sequence(
response.streaming_content,
max_random_bytes=self.max_random_bytes,
)
# Delete the `Content-Length` header for streaming content, because
# we won't know the compressed size until we stream it.
del response.headers["Content-Length"]
else:
# Return the compressed content only if it's actually shorter.
compressed_content = compress_string(
response.content,
max_random_bytes=self.max_random_bytes,
)
if len(compressed_content) >= len(response.content):
return response
response.content = compressed_content
response.headers["Content-Length"] = str(len(response.content))
# If there is a strong ETag, make it weak to fulfill the requirements
# of RFC 9110 Section 8.8.1 while also allowing conditional request
# matches on ETags.
etag = response.get("ETag")
if etag and etag.startswith('"'):
response.headers["ETag"] = "W/" + etag
response.headers["Content-Encoding"] = "gzip"
return response

View File

@ -56,6 +56,9 @@ MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'common.middleware.gzip.GZipMiddleware',
'common.middleware.chat_headers_middleware.ChatHeadersMiddleware',
'common.middleware.cross_domain_middleware.CrossDomainMiddleware',
]