123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- import os
- import re
- from bs4 import BeautifulSoup
- import openai
- import time
- from tqdm import tqdm
- import sqlite3
- import json
- from datetime import datetime
- # 初始化OpenAI客户端
- client = openai.OpenAI(
- # chatnio
- # base_url="https://api.chatnio.net/v1",
- # api_key="sk-"
- # deepseek
- # base_url="https://api.deepseek.com/v1",
- # api_key="sk-"
- # Qwen/Qwen3-32B
- base_url="https://api.siliconflow.cn/v1",
- api_key="sk-"
- )
- # model_name = "Qwen/Qwen3-32B" # Qwen/Qwen3-32B
- model_name = "deepseek-ai/DeepSeek-R1" # deepseek-ai/DeepSeek-R1
- # 添加版本控制
- VERSION = "1.0.1" # 版本号,用于区分不同版本的翻译
- line_count = 2 # 每组行数,越大越快,但越容易出错
- # 自动调整参数
- MIN_LINE_COUNT = 1
- MAX_LINE_COUNT = 5
- INITIAL_LINE_COUNT = 2
- ERROR_THRESHOLD = 3 # 连续错误次数阈值
- SUCCESS_THRESHOLD = 5 # 连续成功次数阈值
- class LineCountManager:
- def __init__(self):
- self.current_line_count = INITIAL_LINE_COUNT
- self.consecutive_errors = 0
- self.consecutive_successes = 0
- self.last_error_time = None
- self.error_cooldown = 60 # 错误冷却时间(秒)
- self.version = f"1.0.{INITIAL_LINE_COUNT}" # 初始版本号
-
- def adjust_line_count(self, success):
- current_time = time.time()
-
- # 检查是否在冷却期内
- if self.last_error_time and (current_time - self.last_error_time) < self.error_cooldown:
- return self.current_line_count
-
- if success:
- self.consecutive_errors = 0
- self.consecutive_successes += 1
-
- # 如果连续成功次数达到阈值,尝试增加行数
- if self.consecutive_successes >= SUCCESS_THRESHOLD:
- if self.current_line_count < MAX_LINE_COUNT:
- self.current_line_count += 1
- self.consecutive_successes = 0
- self.version = f"1.0.{self.current_line_count}" # 更新版本号
- print(f"翻译连续成功,增加行数到 {self.current_line_count},版本更新为 {self.version}")
- else:
- self.consecutive_successes = 0
- self.consecutive_errors += 1
- self.last_error_time = current_time
-
- # 如果连续错误次数达到阈值,减少行数
- if self.consecutive_errors >= ERROR_THRESHOLD:
- if self.current_line_count > MIN_LINE_COUNT:
- self.current_line_count -= 1
- self.consecutive_errors = 0
- self.version = f"1.0.{self.current_line_count}" # 更新版本号
- print(f"翻译连续失败,减少行数到 {self.current_line_count},版本更新为 {self.version}")
-
- return self.current_line_count
- # 创建全局的LineCountManager实例
- line_count_manager = LineCountManager()
- class TranslationStats:
- def __init__(self):
- self.start_time = time.time()
- self.total_chars = 0
- self.translated_chars = 0
- self.total_requests = 0
- self.successful_requests = 0
- self.failed_requests = 0
-
- def update_stats(self, original_text, translated_text, success=True):
- self.total_chars += len(original_text)
- self.translated_chars += len(translated_text)
- self.total_requests += 1
- if success:
- self.successful_requests += 1
- else:
- self.failed_requests += 1
-
- def get_stats(self):
- elapsed_time = time.time() - self.start_time
- chars_per_second = self.translated_chars / elapsed_time if elapsed_time > 0 else 0
- success_rate = (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0
-
- return {
- "总字符数": self.total_chars,
- "已翻译字符数": self.translated_chars,
- "翻译速度": f"{chars_per_second:.2f} 字符/秒",
- "成功率": f"{success_rate:.1f}%",
- "总请求数": self.total_requests,
- "成功请求": self.successful_requests,
- "失败请求": self.failed_requests,
- "运行时间": f"{elapsed_time:.1f} 秒"
- }
- # 创建全局的统计对象
- translation_stats = TranslationStats()
- def init_db():
- """初始化数据库"""
- conn = sqlite3.connect('translation_progress.db')
- c = conn.cursor()
-
- # 检查是否需要迁移数据库
- try:
- c.execute("SELECT version FROM file_progress LIMIT 1")
- except sqlite3.OperationalError:
- # 如果表不存在或没有version字段,进行迁移
- print("正在更新数据库结构...")
-
- # 备份旧表
- c.execute("ALTER TABLE file_progress RENAME TO file_progress_old")
- c.execute("ALTER TABLE group_progress RENAME TO group_progress_old")
-
- # 创建新表
- c.execute('''
- CREATE TABLE IF NOT EXISTS file_progress (
- file_path TEXT PRIMARY KEY,
- total_lines INTEGER,
- processed_lines INTEGER,
- status TEXT,
- version TEXT,
- last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- ''')
-
- c.execute('''
- CREATE TABLE IF NOT EXISTS group_progress (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- file_path TEXT,
- group_index INTEGER,
- original_text TEXT,
- translated_text TEXT,
- status TEXT,
- version TEXT,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- UNIQUE(file_path, group_index, version)
- )
- ''')
-
- # 迁移数据
- try:
- c.execute('''
- INSERT INTO file_progress
- (file_path, total_lines, processed_lines, status, version, last_updated)
- SELECT file_path, total_lines, processed_lines, status, ?, last_updated
- FROM file_progress_old
- ''', (line_count_manager.version,))
-
- c.execute('''
- INSERT INTO group_progress
- (file_path, group_index, original_text, translated_text, status, version, created_at, updated_at)
- SELECT file_path, group_index, original_text, translated_text, status, ?, created_at, updated_at
- FROM group_progress_old
- ''', (line_count_manager.version,))
-
- # 删除旧表
- c.execute("DROP TABLE file_progress_old")
- c.execute("DROP TABLE group_progress_old")
-
- print("数据库迁移完成")
- except sqlite3.OperationalError as e:
- print(f"迁移数据时出错: {str(e)}")
- # 如果迁移失败,回滚到原始表
- c.execute("DROP TABLE IF EXISTS file_progress")
- c.execute("DROP TABLE IF EXISTS group_progress")
- c.execute("ALTER TABLE file_progress_old RENAME TO file_progress")
- c.execute("ALTER TABLE group_progress_old RENAME TO group_progress")
- raise
- else:
- # 如果表已存在且包含version字段,创建新表
- c.execute('''
- CREATE TABLE IF NOT EXISTS file_progress (
- file_path TEXT PRIMARY KEY,
- total_lines INTEGER,
- processed_lines INTEGER,
- status TEXT,
- version TEXT,
- last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- ''')
-
- c.execute('''
- CREATE TABLE IF NOT EXISTS group_progress (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- file_path TEXT,
- group_index INTEGER,
- original_text TEXT,
- translated_text TEXT,
- status TEXT,
- version TEXT,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- UNIQUE(file_path, group_index, version)
- )
- ''')
-
- conn.commit()
- return conn
- def get_file_progress(conn, file_path):
- """获取文件翻译进度"""
- c = conn.cursor()
- c.execute('SELECT * FROM file_progress WHERE file_path = ?', (file_path,))
- return c.fetchone()
- def update_file_progress(conn, file_path, total_lines, processed_lines, status):
- """更新文件翻译进度"""
- c = conn.cursor()
- c.execute('''
- INSERT OR REPLACE INTO file_progress
- (file_path, total_lines, processed_lines, status, version, last_updated)
- VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
- ''', (file_path, total_lines, processed_lines, status, line_count_manager.version))
- conn.commit()
- def get_group_progress(conn, file_path, group_index):
- """获取翻译组进度"""
- c = conn.cursor()
- c.execute('''
- SELECT * FROM group_progress
- WHERE file_path = ? AND group_index = ?
- ''', (file_path, group_index))
- return c.fetchone()
- def update_group_progress(conn, file_path, group_index, original_text, translated_text, status):
- """更新翻译组进度"""
- c = conn.cursor()
- c.execute('''
- INSERT OR REPLACE INTO group_progress
- (file_path, group_index, original_text, translated_text, status, version, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
- ''', (file_path, group_index, original_text, translated_text, status, line_count_manager.version))
- conn.commit()
- def get_completed_groups(conn, file_path):
- """获取已完成的翻译组"""
- c = conn.cursor()
- c.execute('''
- SELECT group_index, translated_text
- FROM group_progress
- WHERE file_path = ? AND status = 'completed' AND version = ?
- ORDER BY group_index
- ''', (file_path, line_count_manager.version))
- return c.fetchall()
- # """ - 输出内容要求用代码块包裹起来
- # ,只在必要时提供相应的语言注释
- # """
- def translate_text(text, max_retries=3):
- """翻译文本,添加重试机制"""
- start_time = time.time()
- for attempt in range(max_retries):
- try:
- messages = [
- {
- "role": "system",
- "content": "- 你名为epub翻译大师,专注于将任意语言的文本翻译成中文。- 你在翻译过程中,力求保留原文语意,确保翻译的准确性和完整性。- 你特别注重翻译结果要贴合现代人的阅读习惯,使译文更加流畅易懂。- 在处理包含代码结构的文本时,你会特别注意保持代码的原样。- 你的服务旨在为用户提供高效、便捷的翻译体验,帮助用户跨越语言障碍。- 在回答问题的时候,尽可能保留原来的代码结构。"
- },
- {
- "role": "user",
- "content": text
- }
- ]
-
- response = client.chat.completions.create(
- model=model_name,
- messages=messages
- )
- translated_text = response.choices[0].message.content
-
- # 更新统计信息
- translation_stats.update_stats(text, translated_text, True)
-
- # 计算并显示本次翻译的速度
- elapsed = time.time() - start_time
- chars_per_second = len(translated_text) / elapsed if elapsed > 0 else 0
- print(f"\n翻译速度: {chars_per_second:.2f} 字符/秒")
-
- # 翻译成功,调整行数
- line_count_manager.adjust_line_count(True)
- return translated_text
- except Exception as e:
- if attempt == max_retries - 1:
- print(f"翻译失败,已达到最大重试次数: {str(e)}")
- # 更新统计信息
- translation_stats.update_stats(text, text, False)
- # 翻译失败,调整行数
- line_count_manager.adjust_line_count(False)
- return text
- print(f"翻译出错,正在重试 ({attempt + 1}/{max_retries}): {str(e)}")
- time.sleep(2 ** attempt) # 指数退避
- def process_html_file(file_path, conn):
- """处理HTML文件"""
- # 检查文件进度
- progress = get_file_progress(conn, file_path)
-
- try:
- # 尝试不同的编码方式读取文件
- encodings = ['utf-8', 'gbk', 'gb2312', 'latin1']
- content = None
-
- for encoding in encodings:
- try:
- with open(file_path, 'r', encoding=encoding) as f:
- content = f.read()
- break
- except UnicodeDecodeError:
- continue
-
- if content is None:
- raise Exception(f"无法使用支持的编码读取文件: {file_path}")
-
- # 使用正则表达式提取body标签内的内容
- body_pattern = re.compile(r'<body[^>]*>(.*?)</body>', re.DOTALL)
- body_match = body_pattern.search(content)
-
- if not body_match:
- print(f"警告: {file_path} 中没有找到body标签")
- return
-
- body_content = body_match.group(1)
-
- # 按行分割内容,保留所有HTML标签行,但只翻译包含 <p class 的行
- lines = []
- for line in body_content.split('\n'):
- line = line.strip()
- if line and line.startswith('<'):
- lines.append(line)
-
- total_lines = len(lines)
-
- # 获取已完成的翻译组
- completed_groups = get_completed_groups(conn, file_path)
- completed_indices = {group[0] for group in completed_groups}
-
- # 计算已处理的进度
- if progress:
- print(f"文件 {file_path} 已处理进度: {progress[2]}/{progress[1]} 行 ({round(progress[2]*100/progress[1], 2)}%)")
-
- # 按组处理内容
- translated_lines = []
- try:
- with tqdm(range(0, len(lines), line_count_manager.current_line_count),
- desc=f"处理文件 {os.path.basename(file_path)}",
- unit="组") as pbar:
- for i in pbar:
- group_index = i // line_count_manager.current_line_count
-
- # 检查是否已完成
- if group_index in completed_indices:
- # 使用已完成的翻译
- for group in completed_groups:
- if group[0] == group_index:
- translated_lines.extend(group[1].split('\n'))
- break
- continue
-
- group = lines[i:i+line_count_manager.current_line_count]
- if group:
- # 保存原始文本
- original_text = "\n".join(group)
-
- # 收集需要翻译的段落
- paragraphs_to_translate = []
- paragraph_indices = []
- for idx, line in enumerate(group):
- if '<p class' in line:
- paragraphs_to_translate.append(line)
- paragraph_indices.append(idx)
-
- # 如果有需要翻译的段落,进行翻译
- if paragraphs_to_translate:
- translated_paragraphs = []
- for paragraph in paragraphs_to_translate:
- translated_paragraph = translate_text(paragraph)
- translated_paragraphs.append(translated_paragraph)
-
- # 将翻译后的段落放回原位置
- translated_group = group.copy()
- for idx, translated in zip(paragraph_indices, translated_paragraphs):
- translated_group[idx] = translated
- else:
- translated_group = group
-
- translated_text = "\n".join(translated_group)
-
- # 更新翻译组进度
- update_group_progress(conn, file_path, group_index, original_text, translated_text, 'completed')
-
- # 分割翻译后的文本
- translated_lines.extend(translated_group)
-
- # 更新文件进度
- processed_lines = min((group_index + 1) * line_count_manager.current_line_count, total_lines)
- update_file_progress(conn, file_path, total_lines, processed_lines, 'in_progress')
-
- # 显示当前统计信息
- stats = translation_stats.get_stats()
- pbar.set_postfix(stats)
-
- # 添加较小的延迟以避免API限制
- time.sleep(0.1) # 减少延迟时间
-
- # 替换原始内容
- if translated_lines:
- # 保持原始内容的顺序和结构
- new_body_content = body_content
- current_index = 0
-
- # 遍历原始内容,替换需要翻译的部分
- for line in body_content.split('\n'):
- line = line.strip()
- if line and line.startswith('<'):
- if '<p class' in line and current_index < len(translated_lines):
- # 替换翻译后的内容
- new_body_content = new_body_content.replace(line, translated_lines[current_index])
- current_index += 1
- else:
- # 保持原样
- continue
-
- new_content = content.replace(body_content, new_body_content)
-
- # 保存修改后的文件
- with open(file_path, 'w', encoding='utf-8') as f:
- f.write(new_content)
-
- # 更新完成状态
- update_file_progress(conn, file_path, total_lines, total_lines, 'completed')
- print(f"文件 {file_path} 翻译完成")
-
- # 显示最终统计信息
- print("\n翻译统计信息:")
- for key, value in translation_stats.get_stats().items():
- print(f"{key}: {value}")
-
- except KeyboardInterrupt:
- print("\n检测到中断,保存当前进度...")
- if 'processed_lines' in locals():
- update_file_progress(conn, file_path, total_lines, processed_lines, 'interrupted')
- # 显示中断时的统计信息
- print("\n中断时的统计信息:")
- for key, value in translation_stats.get_stats().items():
- print(f"{key}: {value}")
- raise
- except Exception as e:
- print(f"处理文件时出错: {str(e)}")
- if 'processed_lines' in locals():
- update_file_progress(conn, file_path, total_lines, processed_lines, 'error')
- raise
-
- except Exception as e:
- print(f"读取文件时出错: {str(e)}")
- return
- def main():
- ops_dir = "002/Ops"
- html_files = [f for f in os.listdir(ops_dir) if f.endswith('.html')]
-
- print(f"找到 {len(html_files)} 个HTML文件需要处理")
- print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-
- # 初始化数据库连接
- conn = init_db()
-
- try:
- for filename in tqdm(html_files, desc="处理文件", unit="文件"):
- file_path = os.path.join(ops_dir, filename)
- process_html_file(file_path, conn)
- except KeyboardInterrupt:
- print("\n程序被用户中断")
- finally:
- conn.close()
- print(f"\n结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print("\n最终统计信息:")
- for key, value in translation_stats.get_stats().items():
- print(f"{key}: {value}")
- if __name__ == "__main__":
- main()
|