import os import re from bs4 import BeautifulSoup import openai import time from tqdm import tqdm import sqlite3 import json from datetime import datetime import logging from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import asyncio import aiohttp from concurrent.futures import ThreadPoolExecutor from functools import lru_cache import hashlib import yaml from pathlib import Path # 配置管理 class Config: def __init__(self, config_path='config.yaml'): self.config_path = config_path self.config = self.load_config() # 设置日志 self.setup_logging() # 初始化OpenAI客户端 self.setup_openai() def load_config(self): """加载配置文件""" if not os.path.exists(self.config_path): # 创建默认配置 default_config = { 'logging': { 'level': 'INFO', 'format': '%(asctime)s - %(levelname)s - %(message)s', 'file': 'translation.log' }, 'openai': { 'base_url': 'https://api.siliconflow.cn/v1', 'api_key': 'sk-', 'model_name': 'deepseek-ai/DeepSeek-R1', 'max_retries': 3, 'retry_delay': 2, 'timeout': 30, 'max_concurrent_requests': 5 }, 'translation': { 'min_line_count': 1, 'max_line_count': 5, 'initial_line_count': 2, 'error_threshold': 3, 'success_threshold': 5, 'error_cooldown': 60, 'cache_size': 1000 }, 'database': { 'path': 'translation_progress.db', 'pool_size': 5 }, 'paths': { 'input_dir': '002/Ops', 'output_dir': '002/Ops_translated' } } # 保存默认配置 with open(self.config_path, 'w', encoding='utf-8') as f: yaml.dump(default_config, f, allow_unicode=True) return default_config # 加载现有配置 with open(self.config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def setup_logging(self): """设置日志""" logging.basicConfig( level=getattr(logging, self.config['logging']['level']), format=self.config['logging']['format'], handlers=[ logging.FileHandler(self.config['logging']['file']), logging.StreamHandler() ] ) def setup_openai(self): """设置OpenAI客户端""" self.client = openai.OpenAI( base_url=self.config['openai']['base_url'], api_key=self.config['openai']['api_key'] ) def get(self, *keys): """获取配置值""" value = self.config for key in keys: value = value[key] return value def update(self, updates): """更新配置""" def deep_update(d, u): for k, v in u.items(): if isinstance(v, dict): d[k] = deep_update(d.get(k, {}), v) else: d[k] = v return d self.config = deep_update(self.config, updates) # 保存更新后的配置 with open(self.config_path, 'w', encoding='utf-8') as f: yaml.dump(self.config, f, allow_unicode=True) # 重新设置日志和OpenAI客户端 self.setup_logging() self.setup_openai() # 创建全局的配置实例 config = Config() # 更新全局变量 MODEL_CONFIG = { "model_name": config.get('openai', 'model_name'), "max_retries": config.get('openai', 'max_retries'), "retry_delay": config.get('openai', 'retry_delay'), "timeout": config.get('openai', 'timeout'), "max_concurrent_requests": config.get('openai', 'max_concurrent_requests'), "cache_size": config.get('translation', 'cache_size') } MIN_LINE_COUNT = config.get('translation', 'min_line_count') MAX_LINE_COUNT = config.get('translation', 'max_line_count') INITIAL_LINE_COUNT = config.get('translation', 'initial_line_count') ERROR_THRESHOLD = config.get('translation', 'error_threshold') SUCCESS_THRESHOLD = config.get('translation', 'success_threshold') # 更新其他类的初始化参数 class LineCountManager: def __init__(self): self.current_line_count = INITIAL_LINE_COUNT self.consecutive_errors = 0 self.consecutive_successes = 0 self.last_error_time = None self.error_cooldown = config.get('translation', 'error_cooldown') self.version = f"1.0.{INITIAL_LINE_COUNT}" self.error_history = [] def adjust_line_count(self, success): """根据翻译结果调整行数""" current_time = time.time() # 检查是否在冷却期内 if self.last_error_time and (current_time - self.last_error_time) < self.error_cooldown: return self.current_line_count if success: self.consecutive_errors = 0 self.consecutive_successes += 1 # 如果连续成功次数达到阈值,尝试增加行数 if self.consecutive_successes >= SUCCESS_THRESHOLD: if self.current_line_count < MAX_LINE_COUNT: self.current_line_count += 1 self.consecutive_successes = 0 self.version = f"1.0.{self.current_line_count}" logging.info(f"翻译连续成功,增加行数到 {self.current_line_count},版本更新为 {self.version}") else: self.consecutive_successes = 0 self.consecutive_errors += 1 self.last_error_time = current_time # 记录错误 self.error_history.append({ 'time': current_time, 'line_count': self.current_line_count }) # 如果连续错误次数达到阈值,减少行数 if self.consecutive_errors >= ERROR_THRESHOLD: if self.current_line_count > MIN_LINE_COUNT: self.current_line_count -= 1 self.consecutive_errors = 0 self.version = f"1.0.{self.current_line_count}" logging.warning(f"翻译连续失败,减少行数到 {self.current_line_count},版本更新为 {self.version}") return self.current_line_count def get_error_stats(self): """获取错误统计信息""" if not self.error_history: return "无错误记录" recent_errors = [e for e in self.error_history if time.time() - e['time'] < 3600] # 最近一小时的错误 return { "总错误数": len(self.error_history), "最近一小时错误数": len(recent_errors), "当前行数": self.current_line_count, "连续错误": self.consecutive_errors, "连续成功": self.consecutive_successes } class DatabaseManager: def __init__(self): self.db_path = config.get('database', 'path') self.conn = None self.init_db() def get_connection(self): """获取数据库连接""" if self.conn is None: self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row return self.conn def close(self): """关闭数据库连接""" if self.conn: self.conn.close() self.conn = None def init_db(self): """初始化数据库""" conn = self.get_connection() c = conn.cursor() # 创建文件进度表 c.execute(''' CREATE TABLE IF NOT EXISTS file_progress ( file_path TEXT PRIMARY KEY, total_lines INTEGER, processed_lines INTEGER, status TEXT, version TEXT, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_count INTEGER DEFAULT 0, retry_count INTEGER DEFAULT 0 ) ''') # 创建翻译组进度表 c.execute(''' CREATE TABLE IF NOT EXISTS group_progress ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT, group_index INTEGER, original_text TEXT, translated_text TEXT, status TEXT, version TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_count INTEGER DEFAULT 0, retry_count INTEGER DEFAULT 0, UNIQUE(file_path, group_index, version) ) ''') # 创建错误日志表 c.execute(''' CREATE TABLE IF NOT EXISTS error_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT, group_index INTEGER, error_type TEXT, error_message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, resolved_at TIMESTAMP, resolution TEXT ) ''') conn.commit() def begin_transaction(self): """开始事务""" self.get_connection().execute('BEGIN TRANSACTION') def commit_transaction(self): """提交事务""" self.get_connection().commit() def rollback_transaction(self): """回滚事务""" self.get_connection().rollback() def get_file_progress(self, file_path): """获取文件翻译进度""" c = self.get_connection().cursor() c.execute('SELECT * FROM file_progress WHERE file_path = ?', (file_path,)) return c.fetchone() def update_file_progress(self, file_path, total_lines, processed_lines, status): """更新文件翻译进度""" c = self.get_connection().cursor() c.execute(''' INSERT OR REPLACE INTO file_progress (file_path, total_lines, processed_lines, status, version, last_updated) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (file_path, total_lines, processed_lines, status, line_count_manager.version)) self.get_connection().commit() def get_group_progress(self, file_path, group_index): """获取翻译组进度""" c = self.get_connection().cursor() c.execute(''' SELECT * FROM group_progress WHERE file_path = ? AND group_index = ? AND version = ? ''', (file_path, group_index, line_count_manager.version)) return c.fetchone() def update_group_progress(self, file_path, group_index, original_text, translated_text, status): """更新翻译组进度""" c = self.get_connection().cursor() c.execute(''' INSERT OR REPLACE INTO group_progress (file_path, group_index, original_text, translated_text, status, version, updated_at) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (file_path, group_index, original_text, translated_text, status, line_count_manager.version)) self.get_connection().commit() def log_error(self, file_path, group_index, error_type, error_message): """记录错误""" c = self.get_connection().cursor() c.execute(''' INSERT INTO error_log (file_path, group_index, error_type, error_message) VALUES (?, ?, ?, ?) ''', (file_path, group_index, error_type, error_message)) self.get_connection().commit() def get_error_stats(self): """获取错误统计信息""" c = self.get_connection().cursor() c.execute(''' SELECT COUNT(*) as total_errors, COUNT(CASE WHEN resolved_at IS NULL THEN 1 END) as unresolved_errors, COUNT(CASE WHEN created_at > datetime('now', '-1 hour') THEN 1 END) as recent_errors FROM error_log ''') return c.fetchone() class AsyncTranslationManager: def __init__(self): self.semaphore = asyncio.Semaphore(config.get('openai', 'max_concurrent_requests')) self.session = None class TranslationCache: def __init__(self): self.cache = {} self.max_size = config.get('translation', 'cache_size') self.hits = 0 self.misses = 0 # 创建全局实例 line_count_manager = LineCountManager() db_manager = DatabaseManager() async_translation_manager = AsyncTranslationManager() translation_cache = TranslationCache() # 添加版本控制 VERSION = "1.0.1" # 版本号,用于区分不同版本的翻译 line_count = 2 # 每组行数,越大越快,但越容易出错 class TranslationStats: def __init__(self): self.start_time = time.time() self.total_chars = 0 self.translated_chars = 0 self.total_requests = 0 self.successful_requests = 0 self.failed_requests = 0 def update_stats(self, original_text, translated_text, success=True): self.total_chars += len(original_text) self.translated_chars += len(translated_text) self.total_requests += 1 if success: self.successful_requests += 1 else: self.failed_requests += 1 def get_stats(self): elapsed_time = time.time() - self.start_time chars_per_second = self.translated_chars / elapsed_time if elapsed_time > 0 else 0 success_rate = (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0 return { "总字符数": self.total_chars, "已翻译字符数": self.translated_chars, "翻译速度": f"{chars_per_second:.2f} 字符/秒", "成功率": f"{success_rate:.1f}%", "总请求数": self.total_requests, "成功请求": self.successful_requests, "失败请求": self.failed_requests, "运行时间": f"{elapsed_time:.1f} 秒" } # 创建全局的统计对象 translation_stats = TranslationStats() def get_completed_groups(conn, file_path): """获取已完成的翻译组""" c = conn.cursor() c.execute(''' SELECT group_index, translated_text FROM group_progress WHERE file_path = ? AND status = 'completed' AND version = ? ORDER BY group_index ''', (file_path, line_count_manager.version)) return c.fetchall() # """ - 输出内容要求用代码块包裹起来 # ,只在必要时提供相应的语言注释 # """ @retry( stop=stop_after_attempt(MODEL_CONFIG['max_retries']), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((openai.APIError, openai.APITimeoutError)), before_sleep=lambda retry_state: logging.warning(f"重试第 {retry_state.attempt_number} 次...") ) def translate_text(text): """翻译文本,使用tenacity进行重试""" try: messages = [ { "role": "system", "content": "- 你名为epub翻译大师,专注于将任意语言的文本翻译成中文。- 你在翻译过程中,力求保留原文语意,确保翻译的准确性和完整性。- 你特别注重翻译结果要贴合现代人的阅读习惯,使译文更加流畅易懂。- 在处理包含代码结构的文本时,你会特别注意保持代码的原样。- 你的服务旨在为用户提供高效、便捷的翻译体验,帮助用户跨越语言障碍。- 在回答问题的时候,尽可能保留原来的代码结构。" }, { "role": "user", "content": text } ] response = config.client.chat.completions.create( model=MODEL_CONFIG['model_name'], messages=messages, timeout=MODEL_CONFIG['timeout'] ) translated_text = response.choices[0].message.content line_count_manager.adjust_line_count(True) return translated_text except Exception as e: logging.error(f"翻译出错: {str(e)}") line_count_manager.adjust_line_count(False) raise def process_html_file(file_path, conn): """处理HTML文件""" # 检查文件进度 progress = db_manager.get_file_progress(file_path) try: # 尝试不同的编码方式读取文件 encodings = ['utf-8', 'gbk', 'gb2312', 'latin1'] content = None for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() break except UnicodeDecodeError: continue if content is None: raise Exception(f"无法使用支持的编码读取文件: {file_path}") # 使用正则表达式提取body标签内的内容 body_pattern = re.compile(r']*>(.*?)', re.DOTALL) body_match = body_pattern.search(content) if not body_match: print(f"警告: {file_path} 中没有找到body标签") return body_content = body_match.group(1) # 按行分割内容,保留所有HTML标签行,但只翻译包含