MaxKB/apps/common/util/fork.py
2023-12-27 18:33:23 +08:00

113 lines
4.6 KiB
Python

import re
from functools import reduce
from typing import List, Set
import requests
import html2text as ht
from bs4 import BeautifulSoup
from urllib.parse import urljoin
class ForkManage:
def __init__(self, base_url: str, selector_list: List[str]):
self.base_url = base_url
self.selector_list = selector_list
def fork(self, level: int, exclude_link_url: Set[str], fork_handler):
self.fork_child(self.base_url, self.selector_list, level, exclude_link_url, fork_handler)
@staticmethod
def fork_child(base_url: str, selector_list: List[str], level: int, exclude_link_url: Set[str], fork_handler):
if level < 0:
return
response = Fork(base_url, selector_list).fork()
fork_handler(base_url, response)
for child_link in response.child_link_list:
if not exclude_link_url.__contains__(child_link):
exclude_link_url.add(child_link)
ForkManage.fork_child(child_link, selector_list, level - 1, exclude_link_url, fork_handler)
class Fork:
class Response:
def __init__(self, html_content: str, child_link_list: List[str], status, message: str):
self.html_content = html_content
self.child_link_list = child_link_list
self.status = status
self.message = message
@staticmethod
def success(html_content: str, child_link_list: List[str]):
return Fork.Response(html_content, child_link_list, 200, '')
@staticmethod
def error(message: str):
return Fork.Response('', [], 500, message)
def __init__(self, base_fork_url: str, selector_list: List[str]):
self.base_fork_url = urljoin(base_fork_url if base_fork_url.endswith("/") else base_fork_url + '/', '.')
self.base_fork_url = base_fork_url
self.selector_list = selector_list
def get_child_link_list(self, bf: BeautifulSoup):
pattern = "^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*|" + self.base_fork_url
link_list = bf.find_all(name='a', href=re.compile(pattern))
result = [self.parse_href(link.get('href')) for link in link_list]
return result
def get_content_html(self, bf: BeautifulSoup):
if self.selector_list is None or len(self.selector_list) == 0:
return str(bf)
params = reduce(lambda x, y: {**x, **y},
[{'class_': selector.replace('.', '')} if selector.startswith('.') else {
'id': selector.replace("#", "") if selector.startswith("#") else {'name': selector}} for
selector in
self.selector_list], {})
f = bf.find_all(**params)
return "\n".join([str(row) for row in f])
def parse_href(self, href: str):
if href.startswith(self.base_fork_url[:-1] if self.base_fork_url.endswith('/') else self.base_fork_url):
return href
else:
return urljoin(self.base_fork_url + '/' + (href if href.endswith('/') else href + '/'), ".")
def reset_beautiful_soup(self, bf: BeautifulSoup):
href_list = bf.find_all(href=re.compile('^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*'))
for h in href_list:
h['href'] = urljoin(
self.base_fork_url + '/' + (h['href'] if h['href'].endswith('/') else h['href'] + '/'),
".")[:-1]
src_list = bf.find_all(src=re.compile('^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*'))
for s in src_list:
s['src'] = urljoin(
self.base_fork_url + '/' + (s['src'] if s['src'].endswith('/') else s['src'] + '/'),
".")[:-1]
return bf
@staticmethod
def get_beautiful_soup(response):
encoding = response.apparent_encoding if response.apparent_encoding is not None else 'utf-8'
html_content = response.content.decode(encoding)
return BeautifulSoup(html_content, "html.parser")
def fork(self):
try:
response = requests.get(self.base_fork_url)
if response.status_code != 200:
raise Exception(response.status_code)
bf = self.get_beautiful_soup(response)
except Exception as e:
return Fork.Response.error(str(e))
bf = self.reset_beautiful_soup(bf)
link_list = self.get_child_link_list(bf)
content = self.get_content_html(bf)
r = ht.html2text(content)
return Fork.Response.success(r, link_list)
def handler(base_url, response: Fork.Response):
print(base_url, response.status)
ForkManage('https://dataease.io/docs/v2/', ['.md-content']).fork(3, set(), handler)