import os import re import openai import time from tqdm import tqdm import sqlite3 from datetime import datetime import logging from logging.handlers import RotatingFileHandler from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import asyncio import yaml import threading from collections import deque # 配置管理 class Config: def __init__(self, config_path='config.yaml'): self.config_path = config_path self.config = self.load_config() # 验证配置 self.validate_config() # 设置日志 self.setup_logging() # 初始化OpenAI客户端 self.setup_openai() def validate_config(self): """验证配置项""" required_fields = { 'logging': ['level', 'format', 'file'], 'openai': ['base_url', 'api_key', 'model_name', 'max_retries', 'retry_delay', 'timeout', 'max_concurrent_requests'], 'translation': ['min_line_count', 'max_line_count', 'initial_line_count', 'error_threshold', 'success_threshold', 'error_cooldown', 'cache_size'], 'database': ['path', 'pool_size'], 'paths': ['input_dir', 'output_dir'] } for section, fields in required_fields.items(): if section not in self.config: raise ValueError(f"缺少配置节: {section}") for field in fields: if field not in self.config[section]: raise ValueError(f"缺少配置项: {section}.{field}") def load_config(self): """加载配置文件""" if not os.path.exists(self.config_path): # 创建默认配置 default_config = { 'logging': { 'level': 'INFO', 'format': '%(asctime)s - %(levelname)s - %(message)s', 'file': 'translation.log' }, 'openai': { 'base_url': 'https://api.siliconflow.cn/v1', 'api_key': 'sk-', 'model_name': 'deepseek-ai/DeepSeek-R1', 'max_retries': 3, 'retry_delay': 2, 'timeout': 30, 'max_concurrent_requests': 5 }, 'translation': { 'min_line_count': 1, 'max_line_count': 5, 'initial_line_count': 2, 'error_threshold': 3, 'success_threshold': 5, 'error_cooldown': 60, 'cache_size': 1000 }, 'database': { 'path': 'translation_progress.db', 'pool_size': 5 }, 'paths': { 'input_dir': '002/Ops', 'output_dir': '002/Ops_translated' } } # 保存默认配置 with open(self.config_path, 'w', encoding='utf-8') as f: yaml.dump(default_config, f, allow_unicode=True) return default_config # 加载现有配置 with open(self.config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def setup_logging(self): """设置日志""" log_file = self.config['logging']['file'] log_dir = os.path.dirname(log_file) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir) # 创建日志处理器 file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, # 10MB backupCount=5, encoding='utf-8' ) console_handler = logging.StreamHandler() # 设置日志格式 formatter = logging.Formatter(self.config['logging']['format']) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # 配置根日志记录器 root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, self.config['logging']['level'])) root_logger.addHandler(file_handler) root_logger.addHandler(console_handler) def setup_openai(self): """设置OpenAI客户端""" self.client = openai.OpenAI( base_url=self.config['openai']['base_url'], api_key=self.config['openai']['api_key'] ) def get(self, *keys): """获取配置值""" value = self.config for key in keys: value = value[key] return value def update(self, updates): """更新配置""" def deep_update(d, u): for k, v in u.items(): if isinstance(v, dict): d[k] = deep_update(d.get(k, {}), v) else: d[k] = v return d self.config = deep_update(self.config, updates) # 保存更新后的配置 with open(self.config_path, 'w', encoding='utf-8') as f: yaml.dump(self.config, f, allow_unicode=True) # 重新设置日志和OpenAI客户端 self.setup_logging() self.setup_openai() # 创建全局的配置实例 config = Config() # 更新全局变量 MODEL_CONFIG = { "model_name": config.get('openai', 'model_name'), "max_retries": config.get('openai', 'max_retries'), "retry_delay": config.get('openai', 'retry_delay'), "timeout": config.get('openai', 'timeout'), "max_concurrent_requests": config.get('openai', 'max_concurrent_requests'), "cache_size": config.get('translation', 'cache_size') } MIN_LINE_COUNT = config.get('translation', 'min_line_count') MAX_LINE_COUNT = config.get('translation', 'max_line_count') INITIAL_LINE_COUNT = config.get('translation', 'initial_line_count') ERROR_THRESHOLD = config.get('translation', 'error_threshold') SUCCESS_THRESHOLD = config.get('translation', 'success_threshold') # 更新其他类的初始化参数 class TranslationStats: def __init__(self): self.start_time = time.time() self.total_chars = 0 self.translated_chars = 0 self.total_requests = 0 self.successful_requests = 0 self.failed_requests = 0 def update_stats(self, original_text, translated_text, success=True): self.total_chars += len(original_text) self.translated_chars += len(translated_text) self.total_requests += 1 if success: self.successful_requests += 1 else: self.failed_requests += 1 def get_stats(self): elapsed_time = time.time() - self.start_time chars_per_second = self.translated_chars / elapsed_time if elapsed_time > 0 else 0 success_rate = (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0 return { "总字符数": self.total_chars, "已翻译字符数": self.translated_chars, "翻译速度": f"{chars_per_second:.2f} 字符/秒", "成功率": f"{success_rate:.1f}%", "总请求数": self.total_requests, "成功请求": self.successful_requests, "失败请求": self.failed_requests, "运行时间": f"{elapsed_time:.1f} 秒" } # 创建全局的统计对象 translation_stats = TranslationStats() class DatabaseManager: def __init__(self): self.db_path = config.get('database', 'path') self.conn = None self.batch_size = 100 # 批量更新的大小 self.pending_updates = [] # 待更新的操作 self.init_db() def get_connection(self): """获取数据库连接""" if self.conn is None: self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row # 启用外键约束 self.conn.execute("PRAGMA foreign_keys = ON") # 设置WAL模式提高并发性能 self.conn.execute("PRAGMA journal_mode = WAL") return self.conn def close(self): """关闭数据库连接""" if self.conn: # 提交所有待处理的更新 self.flush_updates() self.conn.close() self.conn = None def flush_updates(self): """提交所有待处理的更新""" if not self.pending_updates: return try: self.begin_transaction() for update in self.pending_updates: update() self.commit_transaction() except Exception as e: self.rollback_transaction() logging.error(f"批量更新失败: {str(e)}") raise finally: self.pending_updates = [] def add_update(self, update_func): """添加待处理的更新操作""" self.pending_updates.append(update_func) if len(self.pending_updates) >= self.batch_size: self.flush_updates() def update_file_progress(self, file_path, total_lines, processed_lines, status): """更新文件翻译进度""" def update(): c = self.get_connection().cursor() c.execute(''' INSERT OR REPLACE INTO file_progress (file_path, total_lines, processed_lines, status, last_updated) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (file_path, total_lines, processed_lines, status)) self.add_update(update) def update_line_progress(self, file_path, line_index, original_text, translated_text, status): """更新行翻译进度""" def update(): c = self.get_connection().cursor() c.execute(''' INSERT OR REPLACE INTO line_progress (file_path, line_index, original_text, translated_text, status, updated_at) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (file_path, line_index, original_text, translated_text, status)) self.add_update(update) def update_group_progress(self, file_path, group_index, original_text, translated_text, status): """更新翻译组进度""" def update(): c = self.get_connection().cursor() c.execute(''' INSERT OR REPLACE INTO group_progress (file_path, group_index, original_text, translated_text, status, version, updated_at) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (file_path, group_index, original_text, translated_text, status, VERSION)) self.add_update(update) def log_error(self, file_path, line_index, error_type, error_message): """记录错误""" def update(): c = self.get_connection().cursor() c.execute(''' INSERT INTO error_log (file_path, line_index, error_type, error_message) VALUES (?, ?, ?, ?) ''', (file_path, line_index, error_type, error_message)) self.add_update(update) def init_db(self): """初始化数据库""" conn = self.get_connection() c = conn.cursor() # 创建文件进度表 c.execute(''' CREATE TABLE IF NOT EXISTS file_progress ( file_path TEXT PRIMARY KEY, total_lines INTEGER, processed_lines INTEGER, status TEXT, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_count INTEGER DEFAULT 0, retry_count INTEGER DEFAULT 0 ) ''') # 创建行进度表 c.execute(''' CREATE TABLE IF NOT EXISTS line_progress ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT, line_index INTEGER, original_text TEXT, translated_text TEXT, status TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_count INTEGER DEFAULT 0, retry_count INTEGER DEFAULT 0, UNIQUE(file_path, line_index) ) ''') # 创建错误日志表 c.execute(''' CREATE TABLE IF NOT EXISTS error_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT, line_index INTEGER, error_type TEXT, error_message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, resolved_at TIMESTAMP, resolution TEXT ) ''') # 创建翻译组进度表 c.execute(''' CREATE TABLE IF NOT EXISTS group_progress ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT, group_index INTEGER, original_text TEXT, translated_text TEXT, status TEXT, version TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_path, group_index, version) ) ''') conn.commit() def begin_transaction(self): """开始事务""" self.get_connection().execute('BEGIN TRANSACTION') def commit_transaction(self): """提交事务""" self.get_connection().commit() def rollback_transaction(self): """回滚事务""" self.get_connection().rollback() def get_file_progress(self, file_path): """获取文件翻译进度""" c = self.get_connection().cursor() c.execute('SELECT * FROM file_progress WHERE file_path = ?', (file_path,)) return c.fetchone() def get_line_progress(self, file_path, line_index): """获取行翻译进度""" c = self.get_connection().cursor() c.execute(''' SELECT * FROM line_progress WHERE file_path = ? AND line_index = ? ''', (file_path, line_index)) return c.fetchone() def get_error_stats(self): """获取错误统计信息""" c = self.get_connection().cursor() c.execute(''' SELECT COUNT(*) as total_errors, COUNT(CASE WHEN resolved_at IS NULL THEN 1 END) as unresolved_errors, COUNT(CASE WHEN created_at > datetime('now', '-1 hour') THEN 1 END) as recent_errors FROM error_log ''') return c.fetchone() class AsyncTranslationManager: def __init__(self): self.semaphore = asyncio.Semaphore(config.get('openai', 'max_concurrent_requests')) self.session = None class TranslationCache: def __init__(self): self.cache = {} self.max_size = config.get('translation', 'cache_size') self.hits = 0 self.misses = 0 # 创建全局实例 line_count_manager = TranslationStats() db_manager = DatabaseManager() async_translation_manager = AsyncTranslationManager() translation_cache = TranslationCache() # 添加版本控制 VERSION = "1.0.1" # 版本号,用于区分不同版本的翻译 line_count = 4 # 每组行数,固定为4行一组 def get_completed_groups(conn, file_path): """获取已完成的翻译行""" c = conn.cursor() c.execute(''' SELECT group_index, translated_text FROM group_progress WHERE file_path = ? AND status = 'completed' AND version = ? ORDER BY group_index ''', (file_path, VERSION)) return c.fetchall() class TokenBucket: """令牌桶限流器""" def __init__(self, rate, capacity): self.rate = rate # 令牌产生速率(每秒) self.capacity = capacity # 桶容量 self.tokens = capacity # 当前令牌数 self.last_update = time.time() self.lock = threading.Lock() def get_token(self): """获取一个令牌""" with self.lock: now = time.time() # 计算新增的令牌 new_tokens = (now - self.last_update) * self.rate self.tokens = min(self.capacity, self.tokens + new_tokens) self.last_update = now if self.tokens >= 1: self.tokens -= 1 return True return False def wait_for_token(self): """等待直到获得令牌""" while not self.get_token(): time.sleep(0.1) # 创建全局的令牌桶实例 token_bucket = TokenBucket(rate=2, capacity=10) # 每秒2个请求,最多10个并发 @retry( stop=stop_after_attempt(MODEL_CONFIG['max_retries']), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((openai.APIError, openai.APITimeoutError)), before_sleep=lambda retry_state: logging.warning(f"重试第 {retry_state.attempt_number} 次...") ) def translate_text(text): """翻译文本,使用流式输出""" if not text or not text.strip(): logging.warning("收到空文本,跳过翻译") return text try: # 等待获取令牌 token_bucket.wait_for_token() messages = [ { "role": "system", "content": "- 你名为epub翻译大师,专注于将任意语言的文本翻译成中文。- 你在翻译过程中,力求保留原文语意,确保翻译的准确性和完整性。- 你特别注重翻译结果要贴合现代人的阅读习惯,使译文更加流畅易懂。- 在处理包含代码结构的文本时,你会特别注意保持代码的原样。- 你的服务旨在为用户提供高效、便捷的翻译体验,帮助用户跨越语言障碍。- 在回答问题的时候,尽可能保留原来的代码结构。- 在回答问题的时候,尽可能只返回翻译后的内容和代码结构,不要返回任何其他内容。" }, { "role": "user", "content": text } ] # 使用流式输出 stream = config.client.chat.completions.create( model=MODEL_CONFIG['model_name'], messages=messages, timeout=MODEL_CONFIG['timeout'], stream=True, # 启用流式输出 temperature=0.3 # 降低随机性,使翻译更稳定 ) # 收集流式输出的内容 translated_text = "" for chunk in stream: if chunk.choices[0].delta.content is not None: content = chunk.choices[0].delta.content translated_text += content # 实时打印翻译内容 print(content, end='', flush=True) print() # 换行 # 验证翻译结果 if not translated_text or len(translated_text.strip()) == 0: raise ValueError("翻译结果为空") # 更新统计信息 translation_stats.update_stats(text, translated_text, True) return translated_text except openai.APIError as e: logging.error(f"OpenAI API错误: {str(e)}") translation_stats.update_stats(text, "", False) raise except openai.APITimeoutError as e: logging.error(f"OpenAI API超时: {str(e)}") translation_stats.update_stats(text, "", False) raise except Exception as e: logging.error(f"翻译出错: {str(e)}") translation_stats.update_stats(text, "", False) raise def calculate_group_size(text_length): """根据文本长度动态计算分组大小""" if text_length < 1000: return 4 elif text_length < 2000: return 3 else: return 2 def resume_translation(file_path, db_manager): """获取断点续传的起始位置""" progress = db_manager.get_file_progress(file_path) if progress and progress['status'] == 'interrupted': return progress['processed_lines'] return 0 def process_html_file(file_path, conn): """处理HTML文件""" # 检查文件进度 progress = db_manager.get_file_progress(file_path) try: # 尝试不同的编码方式读取文件 encodings = ['utf-8', 'gbk', 'gb2312', 'latin1'] content = None for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() logging.info(f"成功使用 {encoding} 编码读取文件: {file_path}") break except UnicodeDecodeError: continue if content is None: raise Exception(f"无法使用支持的编码读取文件: {file_path}") # 使用正则表达式提取body标签内的内容和title标签 body_pattern = re.compile(r'
]*>(.*?)', re.DOTALL) title_pattern = re.compile(r'