import os import re from typing import Optional, List from pathlib import Path from bs4 import BeautifulSoup, Doctype, Comment import logging from concurrent.futures import ThreadPoolExecutor from functools import partial # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class HTMLProcessor: """HTML文件处理器类""" def __init__(self, max_workers: int = 4): self.max_workers = max_workers def _is_explanation_text(self, text: str) -> bool: """判断文本是否为说明文本""" text = text.strip() explanation_patterns = [ r'^(说明:.*?)$', r'^说明:.*?$', r'^(注:.*?)$', r'^注:.*?$', r'^(.*?)$', # 处理括号内的说明文本 r'^[0-9]+\.\s.*?$', # 处理编号说明 r'^.*?的同时,.*?$', # 处理"的同时"类型的说明 ] return any(re.match(pattern, text, re.DOTALL) for pattern in explanation_patterns) def _clean_content(self, content: str) -> str: """清理HTML内容中的多余空行和注释""" # 移除HTML注释 content = re.sub(r'', '', content, flags=re.DOTALL) # 移除说明文本 content = re.sub(r'(说明:.*?)', '', content, flags=re.DOTALL) content = re.sub(r'说明:.*?$', '', content, flags=re.MULTILINE) content = re.sub(r'(注:.*?)', '', content, flags=re.DOTALL) content = re.sub(r'注:.*?$', '', content, flags=re.MULTILINE) # 移除空行 content = re.sub(r'\n\s*\n', '\n', content).strip() return content def _clean_soup(self, soup: BeautifulSoup) -> None: """清理BeautifulSoup对象中的冗余内容""" # 移除所有注释 for comment in soup.find_all(text=lambda text: isinstance(text, Comment)): comment.extract() # 移除说明文本 for text in soup.find_all(text=True): if isinstance(text, str) and self._is_explanation_text(text): text.extract() # 清理空的标签 for tag in soup.find_all(): if not tag.get_text(strip=True) and not tag.attrs: tag.extract() def _ensure_proper_structure(self, soup: BeautifulSoup) -> BeautifulSoup: """确保HTML结构正确""" # 创建新的soup对象 new_soup = BeautifulSoup('', 'html.parser') # 处理DOCTYPE doctype = None for item in soup.contents: if isinstance(item, Doctype): doctype = item break if doctype: new_soup.append(doctype) # 创建html标签 html_tag = soup.new_tag('html') new_soup.append(html_tag) # 处理head标签 head_tag = soup.find('head') if head_tag: html_tag.append(head_tag) else: head_tag = soup.new_tag('head') html_tag.append(head_tag) # 处理body标签 body_tag = soup.find('body') if body_tag: html_tag.append(body_tag) else: body_tag = soup.new_tag('body') html_tag.append(body_tag) # 将内容移动到body中 for element in soup.find_all(recursive=False): if element.name not in ['html', 'head', 'body']: element.extract() body_tag.append(element) return new_soup def _process_unwrapped_content(self, soup: BeautifulSoup) -> None: """处理未包裹的内容""" body_tag = soup.body if not body_tag: if not soup.html: soup = BeautifulSoup('', 'html.parser') else: soup.html.append(soup.new_tag('body')) body_tag = soup.body for element in list(soup.html.contents): if element.name != 'body' and element != body_tag: if element.name: element.extract() body_tag.append(element) elif str(element).strip(): text = str(element).strip() if not self._is_explanation_text(text): new_p = soup.new_tag('p') new_p.string = text element.replace_with(new_p) body_tag.append(new_p) else: element.extract() def process_file(self, file_path: Path) -> bool: """处理单个HTML文件""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # 清理内容 content = self._clean_content(content) # 解析HTML soup = BeautifulSoup(content, 'html.parser') # 清理soup对象 self._clean_soup(soup) # 确保HTML结构正确 soup = self._ensure_proper_structure(soup) # 处理未包裹内容 self._process_unwrapped_content(soup) # 输出结果 result = self._clean_content(str(soup)) # 写回文件 with open(file_path, 'w', encoding='utf-8') as f: f.write(result) logger.info(f"成功处理: {file_path}") return True except Exception as e: logger.error(f"处理失败 {file_path}: {str(e)}") return False def process_directory(self, directory: str) -> None: """处理目录中的所有HTML文件""" directory_path = Path(directory) if not directory_path.is_dir(): logger.error(f"错误: 目录不存在 - {directory}") return html_files = list(directory_path.rglob("*.html")) if not html_files: logger.warning(f"未在目录中找到HTML文件: {directory}") return success_count = 0 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: results = list(executor.map(self.process_file, html_files)) success_count = sum(1 for result in results if result) logger.info(f"处理完成! 成功处理 {success_count}/{len(html_files)} 个文件") def main(): """主函数""" import sys if len(sys.argv) > 1: target_directory = sys.argv[1] else: target_directory = input("请输入要处理的目录路径: ") processor = HTMLProcessor() processor.process_directory(target_directory) if __name__ == "__main__": main()