import os import re from bs4 import BeautifulSoup import openai import time from tqdm import tqdm import sqlite3 import json from datetime import datetime # 初始化OpenAI客户端 client = openai.OpenAI( # chatnio # base_url="https://api.chatnio.net/v1", # api_key="sk-" # deepseek # base_url="https://api.deepseek.com/v1", # api_key="sk-" # Qwen/Qwen3-32B base_url="https://api.siliconflow.cn/v1", api_key="sk-" ) # model_name = "Qwen/Qwen3-32B" # Qwen/Qwen3-32B model_name = "deepseek-ai/DeepSeek-R1" # deepseek-ai/DeepSeek-R1 # 添加版本控制 VERSION = "1.0.1" # 版本号,用于区分不同版本的翻译 line_count = 2 # 每组行数,越大越快,但越容易出错 # 自动调整参数 MIN_LINE_COUNT = 1 MAX_LINE_COUNT = 5 INITIAL_LINE_COUNT = 2 ERROR_THRESHOLD = 3 # 连续错误次数阈值 SUCCESS_THRESHOLD = 5 # 连续成功次数阈值 class LineCountManager: def __init__(self): self.current_line_count = INITIAL_LINE_COUNT self.consecutive_errors = 0 self.consecutive_successes = 0 self.last_error_time = None self.error_cooldown = 60 # 错误冷却时间(秒) self.version = f"1.0.{INITIAL_LINE_COUNT}" # 初始版本号 def adjust_line_count(self, success): current_time = time.time() # 检查是否在冷却期内 if self.last_error_time and (current_time - self.last_error_time) < self.error_cooldown: return self.current_line_count if success: self.consecutive_errors = 0 self.consecutive_successes += 1 # 如果连续成功次数达到阈值,尝试增加行数 if self.consecutive_successes >= SUCCESS_THRESHOLD: if self.current_line_count < MAX_LINE_COUNT: self.current_line_count += 1 self.consecutive_successes = 0 self.version = f"1.0.{self.current_line_count}" # 更新版本号 print(f"翻译连续成功,增加行数到 {self.current_line_count},版本更新为 {self.version}") else: self.consecutive_successes = 0 self.consecutive_errors += 1 self.last_error_time = current_time # 如果连续错误次数达到阈值,减少行数 if self.consecutive_errors >= ERROR_THRESHOLD: if self.current_line_count > MIN_LINE_COUNT: self.current_line_count -= 1 self.consecutive_errors = 0 self.version = f"1.0.{self.current_line_count}" # 更新版本号 print(f"翻译连续失败,减少行数到 {self.current_line_count},版本更新为 {self.version}") return self.current_line_count # 创建全局的LineCountManager实例 line_count_manager = LineCountManager() class TranslationStats: def __init__(self): self.start_time = time.time() self.total_chars = 0 self.translated_chars = 0 self.total_requests = 0 self.successful_requests = 0 self.failed_requests = 0 def update_stats(self, original_text, translated_text, success=True): self.total_chars += len(original_text) self.translated_chars += len(translated_text) self.total_requests += 1 if success: self.successful_requests += 1 else: self.failed_requests += 1 def get_stats(self): elapsed_time = time.time() - self.start_time chars_per_second = self.translated_chars / elapsed_time if elapsed_time > 0 else 0 success_rate = (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0 return { "总字符数": self.total_chars, "已翻译字符数": self.translated_chars, "翻译速度": f"{chars_per_second:.2f} 字符/秒", "成功率": f"{success_rate:.1f}%", "总请求数": self.total_requests, "成功请求": self.successful_requests, "失败请求": self.failed_requests, "运行时间": f"{elapsed_time:.1f} 秒" } # 创建全局的统计对象 translation_stats = TranslationStats() def init_db(): """初始化数据库""" conn = sqlite3.connect('translation_progress.db') c = conn.cursor() # 检查是否需要迁移数据库 try: c.execute("SELECT version FROM file_progress LIMIT 1") except sqlite3.OperationalError: # 如果表不存在或没有version字段,进行迁移 print("正在更新数据库结构...") # 备份旧表 c.execute("ALTER TABLE file_progress RENAME TO file_progress_old") c.execute("ALTER TABLE group_progress RENAME TO group_progress_old") # 创建新表 c.execute(''' CREATE TABLE IF NOT EXISTS file_progress ( file_path TEXT PRIMARY KEY, total_lines INTEGER, processed_lines INTEGER, status TEXT, version TEXT, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') c.execute(''' CREATE TABLE IF NOT EXISTS group_progress ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT, group_index INTEGER, original_text TEXT, translated_text TEXT, status TEXT, version TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_path, group_index, version) ) ''') # 迁移数据 try: c.execute(''' INSERT INTO file_progress (file_path, total_lines, processed_lines, status, version, last_updated) SELECT file_path, total_lines, processed_lines, status, ?, last_updated FROM file_progress_old ''', (line_count_manager.version,)) c.execute(''' INSERT INTO group_progress (file_path, group_index, original_text, translated_text, status, version, created_at, updated_at) SELECT file_path, group_index, original_text, translated_text, status, ?, created_at, updated_at FROM group_progress_old ''', (line_count_manager.version,)) # 删除旧表 c.execute("DROP TABLE file_progress_old") c.execute("DROP TABLE group_progress_old") print("数据库迁移完成") except sqlite3.OperationalError as e: print(f"迁移数据时出错: {str(e)}") # 如果迁移失败,回滚到原始表 c.execute("DROP TABLE IF EXISTS file_progress") c.execute("DROP TABLE IF EXISTS group_progress") c.execute("ALTER TABLE file_progress_old RENAME TO file_progress") c.execute("ALTER TABLE group_progress_old RENAME TO group_progress") raise else: # 如果表已存在且包含version字段,创建新表 c.execute(''' CREATE TABLE IF NOT EXISTS file_progress ( file_path TEXT PRIMARY KEY, total_lines INTEGER, processed_lines INTEGER, status TEXT, version TEXT, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') c.execute(''' CREATE TABLE IF NOT EXISTS group_progress ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT, group_index INTEGER, original_text TEXT, translated_text TEXT, status TEXT, version TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_path, group_index, version) ) ''') conn.commit() return conn def get_file_progress(conn, file_path): """获取文件翻译进度""" c = conn.cursor() c.execute('SELECT * FROM file_progress WHERE file_path = ?', (file_path,)) return c.fetchone() def update_file_progress(conn, file_path, total_lines, processed_lines, status): """更新文件翻译进度""" c = conn.cursor() c.execute(''' INSERT OR REPLACE INTO file_progress (file_path, total_lines, processed_lines, status, version, last_updated) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (file_path, total_lines, processed_lines, status, line_count_manager.version)) conn.commit() def get_group_progress(conn, file_path, group_index): """获取翻译组进度""" c = conn.cursor() c.execute(''' SELECT * FROM group_progress WHERE file_path = ? AND group_index = ? ''', (file_path, group_index)) return c.fetchone() def update_group_progress(conn, file_path, group_index, original_text, translated_text, status): """更新翻译组进度""" c = conn.cursor() c.execute(''' INSERT OR REPLACE INTO group_progress (file_path, group_index, original_text, translated_text, status, version, updated_at) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (file_path, group_index, original_text, translated_text, status, line_count_manager.version)) conn.commit() def get_completed_groups(conn, file_path): """获取已完成的翻译组""" c = conn.cursor() c.execute(''' SELECT group_index, translated_text FROM group_progress WHERE file_path = ? AND status = 'completed' AND version = ? ORDER BY group_index ''', (file_path, line_count_manager.version)) return c.fetchall() # """ - 输出内容要求用代码块包裹起来 # ,只在必要时提供相应的语言注释 # """ def translate_text(text, max_retries=3): """翻译文本,添加重试机制""" start_time = time.time() for attempt in range(max_retries): try: messages = [ { "role": "system", "content": "- 你名为epub翻译大师,专注于将任意语言的文本翻译成中文。- 你在翻译过程中,力求保留原文语意,确保翻译的准确性和完整性。- 你特别注重翻译结果要贴合现代人的阅读习惯,使译文更加流畅易懂。- 在处理包含代码结构的文本时,你会特别注意保持代码的原样。- 你的服务旨在为用户提供高效、便捷的翻译体验,帮助用户跨越语言障碍。- 在回答问题的时候,尽可能保留原来的代码结构。" }, { "role": "user", "content": text } ] response = client.chat.completions.create( model=model_name, messages=messages ) translated_text = response.choices[0].message.content # 更新统计信息 translation_stats.update_stats(text, translated_text, True) # 计算并显示本次翻译的速度 elapsed = time.time() - start_time chars_per_second = len(translated_text) / elapsed if elapsed > 0 else 0 print(f"\n翻译速度: {chars_per_second:.2f} 字符/秒") # 翻译成功,调整行数 line_count_manager.adjust_line_count(True) return translated_text except Exception as e: if attempt == max_retries - 1: print(f"翻译失败,已达到最大重试次数: {str(e)}") # 更新统计信息 translation_stats.update_stats(text, text, False) # 翻译失败,调整行数 line_count_manager.adjust_line_count(False) return text print(f"翻译出错,正在重试 ({attempt + 1}/{max_retries}): {str(e)}") time.sleep(2 ** attempt) # 指数退避 def process_html_file(file_path, conn): """处理HTML文件""" # 检查文件进度 progress = get_file_progress(conn, file_path) try: # 尝试不同的编码方式读取文件 encodings = ['utf-8', 'gbk', 'gb2312', 'latin1'] content = None for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() break except UnicodeDecodeError: continue if content is None: raise Exception(f"无法使用支持的编码读取文件: {file_path}") # 使用正则表达式提取body标签内的内容 body_pattern = re.compile(r'
]*>(.*?)', re.DOTALL) body_match = body_pattern.search(content) if not body_match: print(f"警告: {file_path} 中没有找到body标签") return body_content = body_match.group(1) # 按行分割内容,保留所有HTML标签行,但只翻译包含