import os import re from bs4 import BeautifulSoup import openai import time from tqdm import tqdm import sqlite3 import json from datetime import datetime import logging from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import hashlib import yaml from pathlib import Path from functools import lru_cache import pickle from collections import OrderedDict import argparse # 配置管理 class Config: def __init__(self, config_path='config.yaml'): self.config_path = config_path self.config = self.load_config() # 设置日志 self.setup_logging() # 初始化OpenAI客户端 self.setup_openai() def load_config(self): """加载配置文件""" if not os.path.exists(self.config_path): # 创建默认配置 default_config = { 'logging': { 'level': 'INFO', 'format': '%(asctime)s - %(levelname)s - %(message)s', 'file': 'translation.log' }, 'openai': { 'base_url': 'https://api.deepseek.com/v1', 'api_key': 'sk-4fa3e232385f465ca143cc403f6f9136', 'model_name': 'deepseek-chat', 'max_retries': 3, 'retry_delay': 2, 'timeout': 30, 'max_concurrent_requests': 5 }, 'translation': { 'min_line_count': 3, 'max_line_count': 10, 'initial_line_count': 3, 'error_threshold': 3, 'success_threshold': 10, 'error_cooldown': 60, 'cache_size': 1000, 'batch_size': 10 # 批量操作大小 }, 'database': { 'path': 'translation_progress.db', 'pool_size': 5, 'batch_commit_size': 20 # 批量提交大小 }, 'paths': { 'input_dir': '002/Ops', 'output_dir': '002/Ops_translated', 'cache_dir': 'cache' # 缓存目录 } } # 保存默认配置 with open(self.config_path, 'w', encoding='utf-8') as f: yaml.dump(default_config, f, allow_unicode=True) return default_config # 加载现有配置 with open(self.config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def setup_logging(self): """设置日志""" logging.basicConfig( level=getattr(logging, self.config['logging']['level']), format=self.config['logging']['format'], handlers=[ logging.FileHandler(self.config['logging']['file']), logging.StreamHandler() ] ) def setup_openai(self): """设置OpenAI客户端""" self.client = openai.OpenAI( base_url=self.config['openai']['base_url'], api_key=self.config['openai']['api_key'] ) def get(self, *keys, default=None): """获取配置值,支持默认值""" try: value = self.config for key in keys: value = value[key] return value except (KeyError, TypeError): if default is not None: return default raise # 创建全局的配置实例 config = Config() # 优化的数据库管理器 class OptimizedDatabaseManager: def __init__(self): self.db_path = config.get('database', 'path', default='translation_progress.db') self.batch_commit_size = config.get('database', 'batch_commit_size', default=20) self.conn = None self.pending_operations = [] self.init_db() def get_connection(self): """获取数据库连接""" if self.conn is None: self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row # 优化SQLite性能 self.conn.execute('PRAGMA journal_mode=WAL') self.conn.execute('PRAGMA synchronous=NORMAL') self.conn.execute('PRAGMA cache_size=10000') self.conn.execute('PRAGMA temp_store=MEMORY') return self.conn def init_db(self): """初始化数据库""" conn = self.get_connection() c = conn.cursor() # 创建翻译缓存表 c.execute(''' CREATE TABLE IF NOT EXISTS translation_cache ( text_hash TEXT PRIMARY KEY, original_text TEXT, translated_text TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, access_count INTEGER DEFAULT 1, last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建文件进度表 c.execute(''' CREATE TABLE IF NOT EXISTS file_progress ( file_path TEXT PRIMARY KEY, total_lines INTEGER, processed_lines INTEGER, status TEXT, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_count INTEGER DEFAULT 0, retry_count INTEGER DEFAULT 0 ) ''') # 创建行进度表 c.execute(''' CREATE TABLE IF NOT EXISTS line_progress ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT, line_index INTEGER, original_text TEXT, translated_text TEXT, status TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_count INTEGER DEFAULT 0, retry_count INTEGER DEFAULT 0, UNIQUE(file_path, line_index) ) ''') # 创建索引提升查询性能 c.execute('CREATE INDEX IF NOT EXISTS idx_translation_cache_hash ON translation_cache(text_hash)') c.execute('CREATE INDEX IF NOT EXISTS idx_file_progress_path ON file_progress(file_path)') c.execute('CREATE INDEX IF NOT EXISTS idx_line_progress_file_line ON line_progress(file_path, line_index)') conn.commit() def add_pending_operation(self, operation_type, params): """添加待处理的操作到批量队列""" self.pending_operations.append((operation_type, params)) # 当达到批量大小时自动提交 if len(self.pending_operations) >= self.batch_commit_size: self.flush_pending_operations() def flush_pending_operations(self): """批量执行待处理的操作""" if not self.pending_operations: return conn = self.get_connection() c = conn.cursor() try: conn.execute('BEGIN TRANSACTION') for operation_type, params in self.pending_operations: if operation_type == 'update_line_progress': c.execute(''' INSERT OR REPLACE INTO line_progress (file_path, line_index, original_text, translated_text, status, updated_at) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', params) elif operation_type == 'update_file_progress': c.execute(''' INSERT OR REPLACE INTO file_progress (file_path, total_lines, processed_lines, status, last_updated) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) ''', params) elif operation_type == 'cache_translation': c.execute(''' INSERT OR REPLACE INTO translation_cache (text_hash, original_text, translated_text, last_accessed, access_count) VALUES (?, ?, ?, CURRENT_TIMESTAMP, COALESCE((SELECT access_count + 1 FROM translation_cache WHERE text_hash = ?), 1)) ''', params + (params[0],)) conn.commit() logging.info(f"批量提交了 {len(self.pending_operations)} 个数据库操作") self.pending_operations.clear() except Exception as e: conn.rollback() logging.error(f"批量操作失败: {e}") raise def get_cached_translation(self, text): """从数据库缓存获取翻译""" text_hash = hashlib.md5(text.encode('utf-8')).hexdigest() c = self.get_connection().cursor() c.execute(''' SELECT translated_text FROM translation_cache WHERE text_hash = ? ''', (text_hash,)) result = c.fetchone() if result: # 更新访问统计 c.execute(''' UPDATE translation_cache SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE text_hash = ? ''', (text_hash,)) self.get_connection().commit() return result[0] return None def cache_translation(self, original_text, translated_text): """缓存翻译结果""" text_hash = hashlib.md5(original_text.encode('utf-8')).hexdigest() self.add_pending_operation('cache_translation', (text_hash, original_text, translated_text)) def update_line_progress(self, file_path, line_index, original_text, translated_text, status): """批量更新行进度""" self.add_pending_operation('update_line_progress', (file_path, line_index, original_text, translated_text, status)) def update_file_progress(self, file_path, total_lines, processed_lines, status): """批量更新文件进度""" self.add_pending_operation('update_file_progress', (file_path, total_lines, processed_lines, status)) def get_file_progress(self, file_path): """获取文件翻译进度""" c = self.get_connection().cursor() c.execute('SELECT * FROM file_progress WHERE file_path = ?', (file_path,)) return c.fetchone() def get_completed_lines(self, file_path): """获取已完成的翻译行""" c = self.get_connection().cursor() c.execute(''' SELECT line_index, translated_text FROM line_progress WHERE file_path = ? AND status = 'completed' ORDER BY line_index ''', (file_path,)) return c.fetchall() def cleanup_old_cache(self, days=30): """清理旧的缓存数据""" c = self.get_connection().cursor() c.execute(''' DELETE FROM translation_cache WHERE last_accessed < datetime('now', '-{} days') AND access_count <= 1 '''.format(days)) deleted = c.rowcount self.get_connection().commit() logging.info(f"清理了 {deleted} 条旧缓存记录") def get_cache_stats(self): """获取缓存统计信息""" c = self.get_connection().cursor() c.execute(''' SELECT COUNT(*) as total_cached, AVG(access_count) as avg_access_count, MAX(access_count) as max_access_count, COUNT(CASE WHEN access_count > 1 THEN 1 END) as reused_translations FROM translation_cache ''') return c.fetchone() def close(self): """关闭数据库连接前刷新所有待处理操作""" self.flush_pending_operations() if self.conn: self.conn.close() self.conn = None # 优化的翻译缓存系统 class OptimizedTranslationCache: def __init__(self): self.memory_cache = OrderedDict() self.max_memory_size = config.get('translation', 'cache_size', default=1000) self.hits = 0 self.misses = 0 cache_dir = config.get('paths', 'cache_dir', default='cache') self.cache_file = os.path.join(cache_dir, 'translation_cache.pkl') self.ensure_cache_dir() self.load_persistent_cache() def ensure_cache_dir(self): """确保缓存目录存在""" cache_dir = config.get('paths', 'cache_dir', default='cache') os.makedirs(cache_dir, exist_ok=True) def load_persistent_cache(self): """加载持久化缓存""" if os.path.exists(self.cache_file): try: with open(self.cache_file, 'rb') as f: self.memory_cache = pickle.load(f) logging.info(f"加载了 {len(self.memory_cache)} 条缓存记录") except Exception as e: logging.warning(f"加载缓存文件失败: {e}") self.memory_cache = OrderedDict() def save_persistent_cache(self): """保存持久化缓存""" try: with open(self.cache_file, 'wb') as f: pickle.dump(self.memory_cache, f) except Exception as e: logging.error(f"保存缓存文件失败: {e}") def get_cache_key(self, text): """生成缓存键""" return hashlib.md5(text.encode('utf-8')).hexdigest() def get(self, text): """获取缓存的翻译""" cache_key = self.get_cache_key(text) # 首先检查内存缓存 if cache_key in self.memory_cache: # 移动到末尾(LRU) self.memory_cache.move_to_end(cache_key) self.hits += 1 return self.memory_cache[cache_key] self.misses += 1 return None def set(self, text, translation): """设置缓存""" cache_key = self.get_cache_key(text) # 如果达到最大容量,删除最老的条目 if len(self.memory_cache) >= self.max_memory_size: self.memory_cache.popitem(last=False) self.memory_cache[cache_key] = translation def get_stats(self): """获取缓存统计信息""" total_requests = self.hits + self.misses hit_rate = (self.hits / total_requests * 100) if total_requests > 0 else 0 return { "缓存命中": self.hits, "缓存未命中": self.misses, "命中率": f"{hit_rate:.1f}%", "内存缓存大小": len(self.memory_cache), "总请求数": total_requests } # 翻译统计管理 class TranslationStats: def __init__(self): self.start_time = time.time() self.total_chars = 0 self.translated_chars = 0 self.total_requests = 0 self.successful_requests = 0 self.failed_requests = 0 self.cache_hits = 0 self.api_calls = 0 def update_stats(self, original_text, translated_text, success=True, from_cache=False): self.total_chars += len(original_text) self.translated_chars += len(translated_text) self.total_requests += 1 if success: self.successful_requests += 1 else: self.failed_requests += 1 if from_cache: self.cache_hits += 1 else: self.api_calls += 1 def get_stats(self): elapsed_time = time.time() - self.start_time chars_per_second = self.translated_chars / elapsed_time if elapsed_time > 0 else 0 success_rate = (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0 cache_hit_rate = (self.cache_hits / self.total_requests * 100) if self.total_requests > 0 else 0 return { "总字符数": self.total_chars, "已翻译字符数": self.translated_chars, "翻译速度": f"{chars_per_second:.2f} 字符/秒", "成功率": f"{success_rate:.1f}%", "缓存命中率": f"{cache_hit_rate:.1f}%", "API调用次数": self.api_calls, "缓存命中次数": self.cache_hits, "运行时间": f"{elapsed_time:.1f} 秒" } # 创建全局实例 db_manager = OptimizedDatabaseManager() translation_cache = OptimizedTranslationCache() translation_stats = TranslationStats() # 模型配置 MODEL_CONFIG = { "model_name": config.get('openai', 'model_name'), "max_retries": config.get('openai', 'max_retries'), "retry_delay": config.get('openai', 'retry_delay'), "timeout": config.get('openai', 'timeout'), } @retry( stop=stop_after_attempt(MODEL_CONFIG['max_retries']), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((openai.APIError, openai.APITimeoutError)), before_sleep=lambda retry_state: logging.warning(f"重试第 {retry_state.attempt_number} 次...") ) def translate_text_with_cache(text): """带缓存的翻译函数""" # 首先检查内存缓存 cached_translation = translation_cache.get(text) if cached_translation: print(f"[缓存命中] {text[:50]}...") translation_stats.update_stats(text, cached_translation, True, from_cache=True) return cached_translation # 检查数据库缓存 db_cached_translation = db_manager.get_cached_translation(text) if db_cached_translation: print(f"[数据库缓存命中] {text[:50]}...") # 同时更新内存缓存 translation_cache.set(text, db_cached_translation) translation_stats.update_stats(text, db_cached_translation, True, from_cache=True) return db_cached_translation # 缓存未命中,调用API翻译 try: print(f"[API翻译] {text[:50]}...") messages = [ { "role": "system", "content": "- 你名为epub翻译大师,专注于将任意语言的文本翻译成中文。- 你在翻译过程中,力求保留原文语意,确保翻译的准确性和完整性。- 你特别注重翻译结果要贴合现代人的阅读习惯,使译文更加流畅易懂。- 在处理包含代码结构的文本时,你会特别注意保持代码的原样。- 你的服务旨在为用户提供高效、便捷的翻译体验,帮助用户跨越语言障碍。- 在回答问题的时候,尽可能保留原来的代码结构。- 在回答问题的时候,尽可能只返回翻译后的内容和代码结构,不要返回任何其他内容。" }, { "role": "user", "content": text } ] # 使用流式输出 stream = config.client.chat.completions.create( model=MODEL_CONFIG['model_name'], messages=messages, timeout=MODEL_CONFIG['timeout'], stream=True ) # 收集流式输出的内容 translated_text = "" for chunk in stream: if chunk.choices[0].delta.content is not None: content = chunk.choices[0].delta.content translated_text += content print(content, end='', flush=True) print() # 换行 # 缓存翻译结果 translation_cache.set(text, translated_text) db_manager.cache_translation(text, translated_text) # 更新统计信息 translation_stats.update_stats(text, translated_text, True, from_cache=False) return translated_text except Exception as e: logging.error(f"翻译出错: {str(e)}") translation_stats.update_stats(text, "", False, from_cache=False) raise def process_html_file_optimized(file_path, force_retranslate=False): """优化的HTML文件处理函数""" # 提前检查输出文件是否已存在 output_dir = config.get('paths', 'output_dir', default='002/Ops_translated') output_path = os.path.join(output_dir, os.path.basename(file_path)) if os.path.exists(output_path) and not force_retranslate: print(f"⚠️ 输出文件已存在: {output_path}") print(f"如需重新翻译,请使用 --force 参数") return # 检查文件进度 progress = db_manager.get_file_progress(file_path) try: # 尝试不同的编码方式读取文件 encodings = ['utf-8', 'gbk', 'gb2312', 'latin1'] content = None for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() break except UnicodeDecodeError: continue if content is None: raise Exception(f"无法使用支持的编码读取文件: {file_path}") # 使用正则表达式提取body标签内的内容和title标签 body_pattern = re.compile(r']*>(.*?)', re.DOTALL) title_pattern = re.compile(r'(.*?)', re.DOTALL) body_match = body_pattern.search(content) title_match = title_pattern.search(content) if not body_match: print(f"警告: {file_path} 中没有找到body标签") return body_content = body_match.group(1) # 处理title标签 if title_match: title_content = title_match.group(1).strip() if title_content: print(f"\n翻译标题: {title_content}") translated_title = translate_text_with_cache(title_content) content = content.replace(f"{title_content}", f"{translated_title}") else: print("\n跳过空标题") # 按行分割body内容 lines = [line.strip() for line in body_content.split('\n') if line.strip()] total_lines = len(lines) # 获取已完成的翻译 completed_lines = db_manager.get_completed_lines(file_path) completed_indices = {line[0] for line in completed_lines} # 计算已处理的进度 if progress: print(f"文件 {file_path} 已处理进度: {progress['processed_lines']}/{progress['total_lines']} 行 ({round(progress['processed_lines']*100/progress['total_lines'], 2)}%)") # 按组处理内容 group_size = config.get('translation', 'initial_line_count', default=3) translated_lines = [] try: with tqdm(range(0, len(lines), group_size), desc=f"处理文件 {os.path.basename(file_path)}", unit="组") as pbar: for i in pbar: # 获取当前组的行 group_lines = lines[i:i+group_size] # 检查是否已完成 if all(i+j in completed_indices for j in range(len(group_lines))): # 使用已完成的翻译 for j in range(len(group_lines)): for line in completed_lines: if line[0] == i+j: translated_lines.append(line[1]) break continue # 将当前组的行合并成一个字符串 group_text = '\n'.join(group_lines) print(f"\n翻译第 {i+1}-{min(i+group_size, len(lines))}/{len(lines)} 行:") # 翻译当前组 translated_group = translate_text_with_cache(group_text) # 将翻译结果按行分割并添加到结果列表 group_translated_lines = translated_group.split('\n') translated_lines.extend(group_translated_lines) # 批量更新行进度 for j, translated_line in enumerate(group_translated_lines): if i+j < len(lines): db_manager.update_line_progress( file_path, i+j, group_lines[j] if j < len(group_lines) else "", translated_line, 'completed' ) # 更新文件进度 db_manager.update_file_progress(file_path, total_lines, min(i+group_size, len(lines)), 'in_progress') # 显示当前统计信息 stats = translation_stats.get_stats() cache_stats = translation_cache.get_stats() pbar.set_postfix({**stats, **cache_stats}) # 添加较小的延迟以避免API限制 time.sleep(0.1) # 替换原始内容 if translated_lines: # 构建新的body内容 new_body_content = '\n'.join(translated_lines) # 替换原始内容中的body部分 new_content = content.replace(body_content, new_body_content) # 保存修改后的文件 output_dir = config.get('paths', 'output_dir', default='002/Ops_translated') os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, os.path.basename(file_path)) with open(output_path, 'w', encoding='utf-8') as f: f.write(new_content) # 更新完成状态 db_manager.update_file_progress(file_path, total_lines, total_lines, 'completed') print(f"文件 {file_path} 翻译完成,已保存到 {output_path}") # 显示最终统计信息 print("\n翻译统计信息:") for key, value in translation_stats.get_stats().items(): print(f"{key}: {value}") print("\n缓存统计信息:") for key, value in translation_cache.get_stats().items(): print(f"{key}: {value}") except KeyboardInterrupt: print("\n检测到中断,保存当前进度...") # 刷新所有待处理的数据库操作 db_manager.flush_pending_operations() # 保存内存缓存 translation_cache.save_persistent_cache() raise except Exception as e: print(f"处理文件时出错: {str(e)}") raise except Exception as e: print(f"读取文件时出错: {str(e)}") return def main(): """主函数""" # 解析命令行参数 parser = argparse.ArgumentParser(description='优化版本的EPUB翻译器') parser.add_argument('--force', '-f', action='store_true', help='强制重新翻译所有文件(忽略已存在的输出文件)') parser.add_argument('--skip-cache', action='store_true', help='跳过缓存,强制调用API翻译') args = parser.parse_args() print("🚀 启动优化版本的EPUB翻译器") print("优化功能:") print(" ✅ 数据库批量操作") print(" ✅ 多级翻译缓存") print(" ✅ 智能进度恢复") print(" ✅ 性能监控") if args.force: print(" 🔄 强制重新翻译模式") if args.skip_cache: print(" 🚫 跳过缓存模式") print("-" * 50) ops_dir = config.get('paths', 'input_dir', default='002/Ops') html_files = [f for f in os.listdir(ops_dir) if f.endswith('.htm') or f.endswith('.html')] # 按文件名排序 html_files.sort() total_files = len(html_files) print(f"找到 {total_files} 个HTML文件需要处理") print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 统计变量 skipped_files = 0 processed_files = 0 error_files = 0 try: for file_index, filename in enumerate(html_files, 1): file_path = os.path.join(ops_dir, filename) print(f"\n开始处理第 {file_index}/{total_files} 个文件: {filename}") print("-" * 50) # 检查输出文件是否已存在 output_dir = config.get('paths', 'output_dir', default='002/Ops_translated') output_path = os.path.join(output_dir, filename) if os.path.exists(output_path) and not args.force: print(f"✅ 翻译文件已存在: {output_path}") print(f"跳过文件 {filename} (使用 --force 强制重新翻译)") skipped_files += 1 continue elif os.path.exists(output_path) and args.force: print(f"🔄 强制重新翻译: {filename}") print(f"将覆盖现有文件: {output_path}") # 检查数据库中的翻译状态 progress = db_manager.get_file_progress(file_path) if progress and progress['status'] == 'completed': print(f"📊 数据库显示文件 {filename} 已完成翻译,但输出文件不存在") print(f"将重新生成输出文件...") # 不跳过,继续处理以重新生成输出文件 try: process_html_file_optimized(file_path, force_retranslate=args.force) print(f"\n完成第 {file_index}/{total_files} 个文件: {filename}") print("-" * 50) processed_files += 1 except Exception as e: print(f"\n处理文件 {filename} 时出错: {str(e)}") print("继续处理下一个文件...") error_files += 1 continue # 显示当前总体进度和缓存效果 completed_files = sum(1 for f in html_files[:file_index] if db_manager.get_file_progress(os.path.join(ops_dir, f)) and db_manager.get_file_progress(os.path.join(ops_dir, f))['status'] == 'completed') print(f"\n总体进度: {completed_files}/{total_files} 个文件完成 " f"({round(completed_files*100/total_files, 2)}%)") # 显示性能统计 print("\n性能统计:") for key, value in translation_stats.get_stats().items(): print(f" {key}: {value}") print("\n缓存效果:") for key, value in translation_cache.get_stats().items(): print(f" {key}: {value}") # 显示数据库缓存统计 db_cache_stats = db_manager.get_cache_stats() if db_cache_stats: print(f" 数据库缓存: {db_cache_stats[0]} 条记录") print(f" 平均访问次数: {db_cache_stats[1]:.1f}") print(f" 重复使用的翻译: {db_cache_stats[3]} 条") # 在文件之间添加短暂延迟 if file_index < total_files: print("\n等待 3 秒后处理下一个文件...") time.sleep(3) except KeyboardInterrupt: print("\n程序被用户中断") finally: # 清理资源 print("\n正在保存缓存和清理资源...") db_manager.flush_pending_operations() translation_cache.save_persistent_cache() db_manager.close() print(f"\n结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("\n📋 处理结果汇总:") print(f" 总文件数: {total_files}") print(f" 🔄 已处理: {processed_files}") print(f" ⏭️ 已跳过: {skipped_files}") print(f" ❌ 出错文件: {error_files}") print(f" ✅ 成功率: {(processed_files/(processed_files+error_files)*100):.1f}%" if (processed_files+error_files) > 0 else " ✅ 成功率: N/A") print("\n🎉 翻译性能统计:") for key, value in translation_stats.get_stats().items(): print(f" {key}: {value}") print("\n📊 缓存效果总结:") for key, value in translation_cache.get_stats().items(): print(f" {key}: {value}") if __name__ == "__main__": main()