import copy import logging import re import traceback from functools import reduce from typing import List, Set import requests import html2text as ht from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse, ParseResult, urlsplit, parse_qs from common.exception.app_exception import AppApiException requests.packages.urllib3.disable_warnings() class ChildLink: def __init__(self, url, tag): self.url = url self.tag = copy.deepcopy(tag) class ForkManage: def __init__(self, base_url: str, selector_list: List[str]): self.base_url = base_url self.selector_list = selector_list def fork(self, level: int, exclude_link_url: Set[str], fork_handler): self.fork_child(ChildLink(self.base_url, None), self.selector_list, level, exclude_link_url, fork_handler) @staticmethod def fork_child(child_link: ChildLink, selector_list: List[str], level: int, exclude_link_url: Set[str], fork_handler): if level < 0: return else: child_url = child_link.url[:-1] if child_link.url.endswith('/') else child_link.url exclude_link_url.add(child_url) response = Fork(child_link.url, selector_list).fork() fork_handler(child_link, response) for child_link in response.child_link_list: child_url = child_link.url[:-1] if child_link.url.endswith('/') else child_link.url if not exclude_link_url.__contains__(child_url): ForkManage.fork_child(child_link, selector_list, level - 1, exclude_link_url, fork_handler) class Fork: class Response: def __init__(self, content: str, child_link_list: List[ChildLink], status, message: str): self.content = content self.child_link_list = child_link_list self.status = status self.message = message @staticmethod def success(html_content: str, child_link_list: List[ChildLink]): return Fork.Response(html_content, child_link_list, 200, '') @staticmethod def error(message: str): return Fork.Response('', [], 500, message) def __init__(self, base_fork_url: str, selector_list: List[str]): self.base_fork_url = urljoin(base_fork_url if base_fork_url.endswith("/") else base_fork_url + '/', '.') parsed = urlsplit(base_fork_url) query = parsed.query self.base_fork_url = self.base_fork_url[:-1] if query is not None and len(query) > 0: self.base_fork_url = self.base_fork_url + '?' + query self.selector_list = [selector for selector in selector_list if selector is not None and len(selector) > 0] self.urlparse = urlparse(self.base_fork_url) self.base_url = ParseResult(scheme=self.urlparse.scheme, netloc=self.urlparse.netloc, path='', params='', query='', fragment='').geturl() def get_child_link_list(self, bf: BeautifulSoup): pattern = "^((?!(http:|https:|tel:/|#|mailto:|javascript:))|" + self.base_fork_url + ").*" link_list = bf.find_all(name='a', href=re.compile(pattern)) result = [ChildLink(link.get('href'), link) for link in link_list] return result def get_content_html(self, bf: BeautifulSoup): if self.selector_list is None or len(self.selector_list) == 0: return str(bf) params = reduce(lambda x, y: {**x, **y}, [{'class_': selector.replace('.', '')} if selector.startswith('.') else {'id': selector.replace("#", "")} if selector.startswith("#") else {'name': selector} for selector in self.selector_list], {}) f = bf.find_all(**params) return "\n".join([str(row) for row in f]) @staticmethod def reset_url(tag, field, base_fork_url): field_value: str = tag[field] if field_value.startswith("/"): result = urlparse(base_fork_url) result_url = ParseResult(scheme=result.scheme, netloc=result.netloc, path=field_value, params='', query='', fragment='').geturl() else: result_url = urljoin( base_fork_url + '/' + (field_value if field_value.endswith('/') else field_value + '/'), ".") result_url = result_url[:-1] if result_url.endswith('/') else result_url tag[field] = result_url def reset_beautiful_soup(self, bf: BeautifulSoup): reset_config_list = [ { 'field': 'href', }, { 'field': 'src', } ] for reset_config in reset_config_list: field = reset_config.get('field') tag_list = bf.find_all(**{field: re.compile('^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*')}) for tag in tag_list: self.reset_url(tag, field, self.base_fork_url) return bf @staticmethod def get_beautiful_soup(response): encoding = response.apparent_encoding if response.apparent_encoding is not None else 'utf-8' html_content = response.content.decode(encoding) return BeautifulSoup(html_content, "html.parser") def fork(self): try: headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36' } logging.getLogger("max_kb").info(f'fork:{self.base_fork_url}') response = requests.get(self.base_fork_url, verify=False, headers=headers) if response.status_code != 200: logging.getLogger("max_kb").error(f"url: {self.base_fork_url} code:{response.status_code}") return Fork.Response.error(f"url: {self.base_fork_url} code:{response.status_code}") bf = self.get_beautiful_soup(response) except Exception as e: logging.getLogger("max_kb_error").error(f'{str(e)}:{traceback.format_exc()}') return Fork.Response.error(str(e)) bf = self.reset_beautiful_soup(bf) link_list = self.get_child_link_list(bf) content = self.get_content_html(bf) r = ht.html2text(content) return Fork.Response.success(r, link_list) def handler(base_url, response: Fork.Response): print(base_url.url, base_url.tag.text if base_url.tag else None, response.content) # ForkManage('https://bbs.fit2cloud.com/c/de/6', ['.md-content']).fork(3, set(), handler)