From 8fb268c25a7ad83573cb11b1656564e6bea54807 Mon Sep 17 00:00:00 2001 From: shaohuzhang1 Date: Wed, 27 Dec 2023 18:33:23 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20url=E8=8E=B7=E5=8F=96=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/util/fork.py | 112 +++++++++++++++++++++++++++++++++++++++ pyproject.toml | 2 + 2 files changed, 114 insertions(+) create mode 100644 apps/common/util/fork.py diff --git a/apps/common/util/fork.py b/apps/common/util/fork.py new file mode 100644 index 000000000..1413437bb --- /dev/null +++ b/apps/common/util/fork.py @@ -0,0 +1,112 @@ +import re +from functools import reduce +from typing import List, Set +import requests +import html2text as ht +from bs4 import BeautifulSoup +from urllib.parse import urljoin + + +class ForkManage: + def __init__(self, base_url: str, selector_list: List[str]): + self.base_url = base_url + self.selector_list = selector_list + + def fork(self, level: int, exclude_link_url: Set[str], fork_handler): + self.fork_child(self.base_url, self.selector_list, level, exclude_link_url, fork_handler) + + @staticmethod + def fork_child(base_url: str, selector_list: List[str], level: int, exclude_link_url: Set[str], fork_handler): + if level < 0: + return + response = Fork(base_url, selector_list).fork() + fork_handler(base_url, response) + for child_link in response.child_link_list: + if not exclude_link_url.__contains__(child_link): + exclude_link_url.add(child_link) + ForkManage.fork_child(child_link, selector_list, level - 1, exclude_link_url, fork_handler) + + +class Fork: + class Response: + def __init__(self, html_content: str, child_link_list: List[str], status, message: str): + self.html_content = html_content + self.child_link_list = child_link_list + self.status = status + self.message = message + + @staticmethod + def success(html_content: str, child_link_list: List[str]): + return Fork.Response(html_content, child_link_list, 200, '') + + @staticmethod + def error(message: str): + return Fork.Response('', [], 500, message) + + def __init__(self, base_fork_url: str, selector_list: List[str]): + self.base_fork_url = urljoin(base_fork_url if base_fork_url.endswith("/") else base_fork_url + '/', '.') + self.base_fork_url = base_fork_url + self.selector_list = selector_list + + def get_child_link_list(self, bf: BeautifulSoup): + pattern = "^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*|" + self.base_fork_url + link_list = bf.find_all(name='a', href=re.compile(pattern)) + result = [self.parse_href(link.get('href')) for link in link_list] + return result + + def get_content_html(self, bf: BeautifulSoup): + if self.selector_list is None or len(self.selector_list) == 0: + return str(bf) + params = reduce(lambda x, y: {**x, **y}, + [{'class_': selector.replace('.', '')} if selector.startswith('.') else { + 'id': selector.replace("#", "") if selector.startswith("#") else {'name': selector}} for + selector in + self.selector_list], {}) + f = bf.find_all(**params) + return "\n".join([str(row) for row in f]) + + def parse_href(self, href: str): + if href.startswith(self.base_fork_url[:-1] if self.base_fork_url.endswith('/') else self.base_fork_url): + return href + else: + return urljoin(self.base_fork_url + '/' + (href if href.endswith('/') else href + '/'), ".") + + def reset_beautiful_soup(self, bf: BeautifulSoup): + href_list = bf.find_all(href=re.compile('^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*')) + for h in href_list: + h['href'] = urljoin( + self.base_fork_url + '/' + (h['href'] if h['href'].endswith('/') else h['href'] + '/'), + ".")[:-1] + src_list = bf.find_all(src=re.compile('^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*')) + for s in src_list: + s['src'] = urljoin( + self.base_fork_url + '/' + (s['src'] if s['src'].endswith('/') else s['src'] + '/'), + ".")[:-1] + return bf + + @staticmethod + def get_beautiful_soup(response): + encoding = response.apparent_encoding if response.apparent_encoding is not None else 'utf-8' + html_content = response.content.decode(encoding) + return BeautifulSoup(html_content, "html.parser") + + def fork(self): + try: + response = requests.get(self.base_fork_url) + if response.status_code != 200: + raise Exception(response.status_code) + bf = self.get_beautiful_soup(response) + except Exception as e: + return Fork.Response.error(str(e)) + bf = self.reset_beautiful_soup(bf) + link_list = self.get_child_link_list(bf) + content = self.get_content_html(bf) + r = ht.html2text(content) + return Fork.Response.success(r, link_list) + + +def handler(base_url, response: Fork.Response): + print(base_url, response.status) + + +ForkManage('https://dataease.io/docs/v2/', ['.md-content']).fork(3, set(), handler) diff --git a/pyproject.toml b/pyproject.toml index 7107e0fb3..13fab8095 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ openai = "^0.28.1" tiktoken = "^0.5.1" qianfan = "^0.1.1" pycryptodome = "^3.19.0" +beautifulsoup4 = "^4.12.2" +html2text = "^2020.1.16" [build-system]