123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989 |
- import os
- import re
- from bs4 import BeautifulSoup
- import openai
- import time
- from tqdm import tqdm
- import sqlite3
- import json
- from datetime import datetime
- import logging
- from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
- import asyncio
- import aiohttp
- from concurrent.futures import ThreadPoolExecutor
- from functools import lru_cache
- import hashlib
- import yaml
- from pathlib import Path
- import multiprocessing
- from multiprocessing import Pool, Manager, Lock
- import queue
- # 配置管理
- class Config:
- def __init__(self, config_path='config.yaml'):
- self.config_path = config_path
- self.config = self.load_config()
-
- # 设置日志
- self.setup_logging()
-
- # 初始化OpenAI客户端
- self.setup_openai()
-
- def load_config(self):
- """加载配置文件"""
- if not os.path.exists(self.config_path):
- # 创建默认配置
- default_config = {
- 'logging': {
- 'level': 'INFO',
- 'format': '%(asctime)s - %(levelname)s - %(message)s',
- 'file': 'translation.log'
- },
- 'openai': {
- 'base_url': 'https://api.siliconflow.cn/v1',
- 'api_key': 'sk-',
- 'model_name': 'deepseek-ai/DeepSeek-R1',
- 'max_retries': 3,
- 'retry_delay': 2,
- 'timeout': 30,
- 'max_concurrent_requests': 5
- },
- 'translation': {
- 'min_line_count': 1,
- 'max_line_count': 5,
- 'initial_line_count': 2,
- 'error_threshold': 3,
- 'success_threshold': 5,
- 'error_cooldown': 60,
- 'cache_size': 1000
- },
- 'database': {
- 'path': 'translation_progress.db',
- 'pool_size': 5
- },
- 'paths': {
- 'input_dir': '002/Ops',
- 'output_dir': '002/Ops_translated'
- }
- }
-
- # 保存默认配置
- with open(self.config_path, 'w', encoding='utf-8') as f:
- yaml.dump(default_config, f, allow_unicode=True)
-
- return default_config
-
- # 加载现有配置
- with open(self.config_path, 'r', encoding='utf-8') as f:
- return yaml.safe_load(f)
-
- def setup_logging(self):
- """设置日志"""
- logging.basicConfig(
- level=getattr(logging, self.config['logging']['level']),
- format=self.config['logging']['format'],
- handlers=[
- logging.FileHandler(self.config['logging']['file']),
- logging.StreamHandler()
- ]
- )
-
- def setup_openai(self):
- """设置OpenAI客户端"""
- self.client = openai.OpenAI(
- base_url=self.config['openai']['base_url'],
- api_key=self.config['openai']['api_key']
- )
-
- def get(self, *keys):
- """获取配置值"""
- value = self.config
- for key in keys:
- value = value[key]
- return value
-
- def update(self, updates):
- """更新配置"""
- def deep_update(d, u):
- for k, v in u.items():
- if isinstance(v, dict):
- d[k] = deep_update(d.get(k, {}), v)
- else:
- d[k] = v
- return d
-
- self.config = deep_update(self.config, updates)
-
- # 保存更新后的配置
- with open(self.config_path, 'w', encoding='utf-8') as f:
- yaml.dump(self.config, f, allow_unicode=True)
-
- # 重新设置日志和OpenAI客户端
- self.setup_logging()
- self.setup_openai()
- # 创建全局的配置实例
- config = Config()
- # 更新全局变量
- MODEL_CONFIG = {
- "model_name": config.get('openai', 'model_name'),
- "max_retries": config.get('openai', 'max_retries'),
- "retry_delay": config.get('openai', 'retry_delay'),
- "timeout": config.get('openai', 'timeout'),
- "max_concurrent_requests": config.get('openai', 'max_concurrent_requests'),
- "cache_size": config.get('translation', 'cache_size')
- }
- MIN_LINE_COUNT = config.get('translation', 'min_line_count')
- MAX_LINE_COUNT = config.get('translation', 'max_line_count')
- INITIAL_LINE_COUNT = config.get('translation', 'initial_line_count')
- ERROR_THRESHOLD = config.get('translation', 'error_threshold')
- SUCCESS_THRESHOLD = config.get('translation', 'success_threshold')
- # 更新其他类的初始化参数
- class LineCountManager:
- def __init__(self):
- self.current_line_count = INITIAL_LINE_COUNT
- self.consecutive_errors = 0
- self.consecutive_successes = 0
- self.last_error_time = None
- self.error_cooldown = config.get('translation', 'error_cooldown')
- self.version = f"1.0.{INITIAL_LINE_COUNT}"
- self.error_history = []
-
- def adjust_line_count(self, success):
- """根据翻译结果调整行数"""
- current_time = time.time()
-
- # 检查是否在冷却期内
- if self.last_error_time and (current_time - self.last_error_time) < self.error_cooldown:
- return self.current_line_count
-
- if success:
- self.consecutive_errors = 0
- self.consecutive_successes = 0 # 重置成功计数,但不增加行数
- else:
- self.consecutive_successes = 0
- self.consecutive_errors += 1
- self.last_error_time = current_time
-
- # 记录错误
- self.error_history.append({
- 'time': current_time,
- 'line_count': self.current_line_count
- })
-
- # 如果连续错误次数达到阈值,减少行数
- if self.consecutive_errors >= ERROR_THRESHOLD:
- if self.current_line_count > MIN_LINE_COUNT:
- self.current_line_count -= 1
- self.consecutive_errors = 0
- self.version = f"1.0.{self.current_line_count}"
- logging.warning(f"翻译连续失败,减少行数到 {self.current_line_count},版本更新为 {self.version}")
-
- return self.current_line_count
-
- def get_error_stats(self):
- """获取错误统计信息"""
- if not self.error_history:
- return "无错误记录"
-
- recent_errors = [e for e in self.error_history if time.time() - e['time'] < 3600] # 最近一小时的错误
- return {
- "总错误数": len(self.error_history),
- "最近一小时错误数": len(recent_errors),
- "当前行数": self.current_line_count,
- "连续错误": self.consecutive_errors,
- "连续成功": self.consecutive_successes
- }
- class DatabaseManager:
- def __init__(self):
- self.db_path = config.get('database', 'path')
- self.conn = None
- self.init_db()
-
- def get_connection(self):
- """获取数据库连接"""
- if self.conn is None:
- self.conn = sqlite3.connect(self.db_path)
- self.conn.row_factory = sqlite3.Row
- return self.conn
-
- def close(self):
- """关闭数据库连接"""
- if self.conn:
- self.conn.close()
- self.conn = None
-
- def init_db(self):
- """初始化数据库"""
- conn = self.get_connection()
- c = conn.cursor()
-
- # 创建文件进度表
- c.execute('''
- CREATE TABLE IF NOT EXISTS file_progress (
- file_path TEXT PRIMARY KEY,
- total_lines INTEGER,
- processed_lines INTEGER,
- status TEXT,
- version TEXT,
- last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- error_count INTEGER DEFAULT 0,
- retry_count INTEGER DEFAULT 0
- )
- ''')
-
- # 创建翻译组进度表
- c.execute('''
- CREATE TABLE IF NOT EXISTS group_progress (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- file_path TEXT,
- group_index INTEGER,
- original_text TEXT,
- translated_text TEXT,
- status TEXT,
- version TEXT,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- error_count INTEGER DEFAULT 0,
- retry_count INTEGER DEFAULT 0,
- UNIQUE(file_path, group_index, version)
- )
- ''')
-
- # 创建错误日志表
- c.execute('''
- CREATE TABLE IF NOT EXISTS error_log (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- file_path TEXT,
- group_index INTEGER,
- error_type TEXT,
- error_message TEXT,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- resolved_at TIMESTAMP,
- resolution TEXT
- )
- ''')
-
- conn.commit()
-
- def begin_transaction(self):
- """开始事务"""
- self.get_connection().execute('BEGIN TRANSACTION')
-
- def commit_transaction(self):
- """提交事务"""
- self.get_connection().commit()
-
- def rollback_transaction(self):
- """回滚事务"""
- self.get_connection().rollback()
-
- def get_file_progress(self, file_path):
- """获取文件翻译进度"""
- c = self.get_connection().cursor()
- c.execute('SELECT * FROM file_progress WHERE file_path = ?', (file_path,))
- return c.fetchone()
-
- def update_file_progress(self, file_path, total_lines, processed_lines, status):
- """更新文件翻译进度"""
- c = self.get_connection().cursor()
- c.execute('''
- INSERT OR REPLACE INTO file_progress
- (file_path, total_lines, processed_lines, status, version, last_updated)
- VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
- ''', (file_path, total_lines, processed_lines, status, line_count_manager.version))
- self.get_connection().commit()
-
- def get_group_progress(self, file_path, group_index):
- """获取翻译组进度"""
- c = self.get_connection().cursor()
- c.execute('''
- SELECT * FROM group_progress
- WHERE file_path = ? AND group_index = ? AND version = ?
- ''', (file_path, group_index, line_count_manager.version))
- return c.fetchone()
-
- def update_group_progress(self, file_path, group_index, original_text, translated_text, status):
- """更新翻译组进度"""
- c = self.get_connection().cursor()
- c.execute('''
- INSERT OR REPLACE INTO group_progress
- (file_path, group_index, original_text, translated_text, status, version, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
- ''', (file_path, group_index, original_text, translated_text, status, line_count_manager.version))
- self.get_connection().commit()
-
- def log_error(self, file_path, group_index, error_type, error_message):
- """记录错误"""
- c = self.get_connection().cursor()
- c.execute('''
- INSERT INTO error_log
- (file_path, group_index, error_type, error_message)
- VALUES (?, ?, ?, ?)
- ''', (file_path, group_index, error_type, error_message))
- self.get_connection().commit()
-
- def get_error_stats(self):
- """获取错误统计信息"""
- c = self.get_connection().cursor()
- c.execute('''
- SELECT
- COUNT(*) as total_errors,
- COUNT(CASE WHEN resolved_at IS NULL THEN 1 END) as unresolved_errors,
- COUNT(CASE WHEN created_at > datetime('now', '-1 hour') THEN 1 END) as recent_errors
- FROM error_log
- ''')
- return c.fetchone()
- class AsyncTranslationManager:
- def __init__(self):
- self.semaphore = asyncio.Semaphore(config.get('openai', 'max_concurrent_requests'))
- self.session = None
- class TranslationCache:
- def __init__(self):
- self.cache = {}
- self.max_size = config.get('translation', 'cache_size')
- self.hits = 0
- self.misses = 0
- # 创建全局实例
- line_count_manager = LineCountManager()
- db_manager = DatabaseManager()
- async_translation_manager = AsyncTranslationManager()
- translation_cache = TranslationCache()
- # 添加版本控制
- VERSION = "1.0.1" # 版本号,用于区分不同版本的翻译
- line_count = 2 # 每组行数,越大越快,但越容易出错
- class TranslationStats:
- def __init__(self):
- self.start_time = time.time()
- self.total_chars = 0
- self.translated_chars = 0
- self.total_requests = 0
- self.successful_requests = 0
- self.failed_requests = 0
-
- def update_stats(self, original_text, translated_text, success=True):
- self.total_chars += len(original_text)
- self.translated_chars += len(translated_text)
- self.total_requests += 1
- if success:
- self.successful_requests += 1
- else:
- self.failed_requests += 1
-
- def get_stats(self):
- elapsed_time = time.time() - self.start_time
- chars_per_second = self.translated_chars / elapsed_time if elapsed_time > 0 else 0
- success_rate = (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0
-
- return {
- "总字符数": self.total_chars,
- "已翻译字符数": self.translated_chars,
- "翻译速度": f"{chars_per_second:.2f} 字符/秒",
- "成功率": f"{success_rate:.1f}%",
- "总请求数": self.total_requests,
- "成功请求": self.successful_requests,
- "失败请求": self.failed_requests,
- "运行时间": f"{elapsed_time:.1f} 秒"
- }
-
- def to_dict(self):
- """返回可序列化的字典"""
- return {
- "total_chars": self.total_chars,
- "translated_chars": self.translated_chars,
- "total_requests": self.total_requests,
- "successful_requests": self.successful_requests,
- "failed_requests": self.failed_requests,
- "elapsed_time": time.time() - self.start_time
- }
-
- @classmethod
- def from_dict(cls, data):
- """从字典创建实例"""
- stats = cls()
- stats.total_chars = data.get("total_chars", 0)
- stats.translated_chars = data.get("translated_chars", 0)
- stats.total_requests = data.get("total_requests", 0)
- stats.successful_requests = data.get("successful_requests", 0)
- stats.failed_requests = data.get("failed_requests", 0)
- stats.start_time = time.time() - data.get("elapsed_time", 0)
- return stats
- # 创建全局的统计对象
- translation_stats = TranslationStats()
- def get_completed_groups(conn, file_path):
- """获取已完成的翻译组"""
- c = conn.cursor()
- c.execute('''
- SELECT group_index, translated_text
- FROM group_progress
- WHERE file_path = ? AND status = 'completed' AND version = ?
- ORDER BY group_index
- ''', (file_path, line_count_manager.version))
- return c.fetchall()
- # """ - 输出内容要求用代码块包裹起来
- # ,只在必要时提供相应的语言注释
- # """
- @retry(
- stop=stop_after_attempt(MODEL_CONFIG['max_retries']),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- retry=retry_if_exception_type((openai.APIError, openai.APITimeoutError)),
- before_sleep=lambda retry_state: logging.warning(f"重试第 {retry_state.attempt_number} 次...")
- )
- def translate_text(text):
- """翻译文本,使用流式输出"""
- try:
- messages = [
- {
- "role": "system",
- "content": "- 你名为epub翻译大师,专注于将任意语言的文本翻译成中文。- 你在翻译过程中,力求保留原文语意,确保翻译的准确性和完整性。- 你特别注重翻译结果要贴合现代人的阅读习惯,使译文更加流畅易懂。- 在处理包含代码结构的文本时,你会特别注意保持代码的原样。- 你的服务旨在为用户提供高效、便捷的翻译体验,帮助用户跨越语言障碍。- 在回答问题的时候,尽可能保留原来的代码结构。- 在回答问题的时候,尽可能只返回翻译后的内容和代码结构,不要返回任何其他内容。- 在任何情况下都不要返回 翻译说明,不要返回任何其他内容。"
- },
- {
- "role": "user",
- "content": text
- }
- ]
-
- # 使用流式输出
- stream = config.client.chat.completions.create(
- model=MODEL_CONFIG['model_name'],
- messages=messages,
- timeout=MODEL_CONFIG['timeout'],
- stream=True # 启用流式输出
- )
-
- # 收集流式输出的内容
- translated_text = ""
- for chunk in stream:
- if chunk.choices[0].delta.content is not None:
- content = chunk.choices[0].delta.content
- translated_text += content
- # 实时打印翻译内容
- print(content, end='', flush=True)
-
- print() # 换行
- line_count_manager.adjust_line_count(True)
-
- # 更新统计信息
- if hasattr(process_files_batch, 'process_stats'):
- process_files_batch.process_stats.update_stats(text, translated_text, True)
-
- return translated_text
-
- except Exception as e:
- logging.error(f"翻译出错: {str(e)}")
- line_count_manager.adjust_line_count(False)
-
- # 更新统计信息
- if hasattr(process_files_batch, 'process_stats'):
- process_files_batch.process_stats.update_stats(text, "", False)
-
- raise
- def process_html_file(file_path, conn):
- """处理HTML文件"""
- # 检查文件进度
- progress = db_manager.get_file_progress(file_path)
-
- try:
- # 尝试不同的编码方式读取文件
- encodings = ['utf-8', 'gbk', 'gb2312', 'latin1']
- content = None
-
- for encoding in encodings:
- try:
- with open(file_path, 'r', encoding=encoding) as f:
- content = f.read()
- break
- except UnicodeDecodeError:
- continue
-
- if content is None:
- raise Exception(f"无法使用支持的编码读取文件: {file_path}")
-
- # 使用正则表达式提取body标签内的内容
- body_pattern = re.compile(r'<body[^>]*>(.*?)</body>', re.DOTALL)
- body_match = body_pattern.search(content)
-
- if not body_match:
- print(f"警告: {file_path} 中没有找到body标签")
- return
-
- body_content = body_match.group(1)
-
- # 按行分割内容,保留所有HTML标签行,但只翻译包含 <p class 的行
- lines = []
- for line in body_content.split('\n'):
- line = line.strip()
- if line and line.startswith('<'):
- lines.append(line)
-
- total_lines = len(lines)
-
- # 获取已完成的翻译组
- completed_groups = get_completed_groups(conn, file_path)
- completed_indices = {group[0] for group in completed_groups}
-
- # 计算已处理的进度
- if progress:
- print(f"文件 {file_path} 已处理进度: {progress[2]}/{progress[1]} 行 ({round(progress[2]*100/progress[1], 2)}%)")
-
- # 按组处理内容
- translated_lines = []
- try:
- with tqdm(range(0, len(lines), line_count_manager.current_line_count),
- desc=f"处理文件 {os.path.basename(file_path)}",
- unit="组") as pbar:
- for i in pbar:
- group_index = i // line_count_manager.current_line_count
-
- # 检查是否已完成
- if group_index in completed_indices:
- # 使用已完成的翻译
- for group in completed_groups:
- if group[0] == group_index:
- translated_lines.extend(group[1].split('\n'))
- break
- continue
-
- group = lines[i:i+line_count_manager.current_line_count]
- if group:
- # 保存原始文本
- original_text = "\n".join(group)
-
- # 收集需要翻译的段落
- paragraphs_to_translate = []
- paragraph_indices = []
- for idx, line in enumerate(group):
- if '<p class' in line or line.startswith('<h'):
- paragraphs_to_translate.append(line)
- paragraph_indices.append(idx)
-
- # 如果有需要翻译的段落,进行翻译
- if paragraphs_to_translate:
- translated_paragraphs = []
- for paragraph in paragraphs_to_translate:
- print(f"\n翻译段落 {len(translated_paragraphs) + 1}/{len(paragraphs_to_translate)}:")
- translated_paragraph = translate_text(paragraph)
- translated_paragraphs.append(translated_paragraph)
-
- # 将翻译后的段落放回原位置
- translated_group = group.copy()
- for idx, translated in zip(paragraph_indices, translated_paragraphs):
- translated_group[idx] = translated
- else:
- translated_group = group
-
- translated_text = "\n".join(translated_group)
-
- # 更新翻译组进度
- db_manager.update_group_progress(file_path, group_index, original_text, translated_text, 'completed')
-
- # 分割翻译后的文本
- translated_lines.extend(translated_group)
-
- # 更新文件进度
- processed_lines = min((group_index + 1) * line_count_manager.current_line_count, total_lines)
- db_manager.update_file_progress(file_path, total_lines, processed_lines, 'in_progress')
-
- # 显示当前统计信息
- stats = translation_stats.get_stats()
- pbar.set_postfix(stats)
-
- # 添加较小的延迟以避免API限制
- time.sleep(0.1) # 减少延迟时间
-
- # 替换原始内容
- if translated_lines:
- # 构建新的body内容
- new_body_content = []
- current_index = 0
-
- # 遍历原始内容,替换需要翻译的部分
- for line in body_content.split('\n'):
- line = line.strip()
- if not line:
- new_body_content.append('')
- continue
-
- if line.startswith('<'):
- if ('<p class' in line or line.startswith('<h')) and current_index < len(translated_lines):
- # 替换翻译后的内容
- new_body_content.append(translated_lines[current_index])
- current_index += 1
- else:
- # 保持原样
- new_body_content.append(line)
- else:
- # 保持非HTML内容原样
- new_body_content.append(line)
-
- # 将新内容重新组合
- new_body_content = '\n'.join(new_body_content)
-
- # 替换原始内容中的body部分
- new_content = content.replace(body_content, new_body_content)
-
- # 保存修改后的文件
- output_dir = config.get('paths', 'output_dir')
- os.makedirs(output_dir, exist_ok=True)
- output_path = os.path.join(output_dir, os.path.basename(file_path))
-
- with open(output_path, 'w', encoding='utf-8') as f:
- f.write(new_content)
-
- # 更新完成状态
- db_manager.update_file_progress(file_path, total_lines, total_lines, 'completed')
- print(f"文件 {file_path} 翻译完成,已保存到 {output_path}")
-
- # 显示最终统计信息
- print("\n翻译统计信息:")
- for key, value in translation_stats.get_stats().items():
- print(f"{key}: {value}")
-
- except KeyboardInterrupt:
- print("\n检测到中断,保存当前进度...")
- if 'processed_lines' in locals():
- db_manager.update_file_progress(file_path, total_lines, processed_lines, 'interrupted')
- # 显示中断时的统计信息
- print("\n中断时的统计信息:")
- for key, value in translation_stats.get_stats().items():
- print(f"{key}: {value}")
- raise
- except Exception as e:
- print(f"处理文件时出错: {str(e)}")
- if 'processed_lines' in locals():
- db_manager.update_file_progress(file_path, total_lines, processed_lines, 'error')
- raise
-
- except Exception as e:
- print(f"读取文件时出错: {str(e)}")
- return
- def process_files_batch(file_batch, process_id):
- """处理一批文件的函数,用于多进程执行"""
- try:
- # 为每个进程创建独立的数据库连接
- process_db = DatabaseManager()
- conn = process_db.get_connection()
-
- # 创建进程级别的统计对象
- process_stats = TranslationStats()
-
- def translate_with_stats(text):
- """包装翻译函数以收集统计信息"""
- try:
- messages = [
- {
- "role": "system",
- "content": "- 你名为epub翻译大师,专注于将任意语言的文本翻译成中文。- 你在翻译过程中,力求保留原文语意,确保翻译的准确性和完整性。- 你特别注重翻译结果要贴合现代人的阅读习惯,使译文更加流畅易懂。- 在处理包含代码结构的文本时,你会特别注意保持代码的原样。- 你的服务旨在为用户提供高效、便捷的翻译体验,帮助用户跨越语言障碍。- 在回答问题的时候,尽可能保留原来的代码结构。- 在回答问题的时候,尽可能只返回翻译后的内容和代码结构,不要返回任何其他内容。- 在任何情况下都不要返回 翻译说明,不要返回任何其他内容。"
- },
- {
- "role": "user",
- "content": text
- }
- ]
-
- # 使用流式输出
- stream = config.client.chat.completions.create(
- model=MODEL_CONFIG['model_name'],
- messages=messages,
- timeout=MODEL_CONFIG['timeout'],
- stream=True # 启用流式输出
- )
-
- # 收集流式输出的内容
- translated_text = ""
- for chunk in stream:
- if chunk.choices[0].delta.content is not None:
- content = chunk.choices[0].delta.content
- translated_text += content
- # 实时打印翻译内容
- print(content, end='', flush=True)
-
- print() # 换行
- line_count_manager.adjust_line_count(True)
-
- # 更新统计信息
- process_stats.update_stats(text, translated_text, True)
- return translated_text
-
- except Exception as e:
- logging.error(f"翻译出错: {str(e)}")
- line_count_manager.adjust_line_count(False)
-
- # 更新统计信息
- process_stats.update_stats(text, "", False)
- raise
-
- for filename in tqdm(file_batch, desc=f"进程 {process_id} 处理文件", unit="文件"):
- file_path = os.path.join(config.get('paths', 'input_dir'), filename)
-
- # 修改 process_html_file 函数调用,使用新的翻译函数
- try:
- # 尝试不同的编码方式读取文件
- encodings = ['utf-8', 'gbk', 'gb2312', 'latin1']
- content = None
-
- for encoding in encodings:
- try:
- with open(file_path, 'r', encoding=encoding) as f:
- content = f.read()
- break
- except UnicodeDecodeError:
- continue
-
- if content is None:
- raise Exception(f"无法使用支持的编码读取文件: {file_path}")
-
- # 使用正则表达式提取body标签内的内容
- body_pattern = re.compile(r'<body[^>]*>(.*?)</body>', re.DOTALL)
- body_match = body_pattern.search(content)
-
- if not body_match:
- print(f"警告: {file_path} 中没有找到body标签")
- continue
-
- body_content = body_match.group(1)
-
- # 按行分割内容,保留所有HTML标签行,但只翻译包含 <p class 的行
- lines = []
- for line in body_content.split('\n'):
- line = line.strip()
- if line and line.startswith('<'):
- lines.append(line)
-
- total_lines = len(lines)
-
- # 获取已完成的翻译组
- completed_groups = get_completed_groups(conn, file_path)
- completed_indices = {group[0] for group in completed_groups}
-
- # 计算已处理的进度
- progress = db_manager.get_file_progress(file_path)
- if progress:
- print(f"文件 {file_path} 已处理进度: {progress[2]}/{progress[1]} 行 ({round(progress[2]*100/progress[1], 2)}%)")
-
- # 按组处理内容
- translated_lines = []
- with tqdm(range(0, len(lines), line_count_manager.current_line_count),
- desc=f"处理文件 {os.path.basename(file_path)}",
- unit="组") as pbar:
- for i in pbar:
- group_index = i // line_count_manager.current_line_count
-
- # 检查是否已完成
- if group_index in completed_indices:
- # 使用已完成的翻译
- for group in completed_groups:
- if group[0] == group_index:
- translated_lines.extend(group[1].split('\n'))
- break
- continue
-
- group = lines[i:i+line_count_manager.current_line_count]
- if group:
- # 保存原始文本
- original_text = "\n".join(group)
-
- # 收集需要翻译的段落
- paragraphs_to_translate = []
- paragraph_indices = []
- for idx, line in enumerate(group):
- if '<p class' in line or line.startswith('<h'):
- paragraphs_to_translate.append(line)
- paragraph_indices.append(idx)
-
- # 如果有需要翻译的段落,进行翻译
- if paragraphs_to_translate:
- translated_paragraphs = []
- for paragraph in paragraphs_to_translate:
- print(f"\n翻译段落 {len(translated_paragraphs) + 1}/{len(paragraphs_to_translate)}:")
- translated_paragraph = translate_with_stats(paragraph)
- translated_paragraphs.append(translated_paragraph)
-
- # 将翻译后的段落放回原位置
- translated_group = group.copy()
- for idx, translated in zip(paragraph_indices, translated_paragraphs):
- translated_group[idx] = translated
- else:
- translated_group = group
-
- translated_text = "\n".join(translated_group)
-
- # 更新翻译组进度
- db_manager.update_group_progress(file_path, group_index, original_text, translated_text, 'completed')
-
- # 分割翻译后的文本
- translated_lines.extend(translated_group)
-
- # 更新文件进度
- processed_lines = min((group_index + 1) * line_count_manager.current_line_count, total_lines)
- db_manager.update_file_progress(file_path, total_lines, processed_lines, 'in_progress')
-
- # 显示当前统计信息
- stats = process_stats.get_stats()
- pbar.set_postfix(stats)
-
- # 添加较小的延迟以避免API限制
- time.sleep(0.1) # 减少延迟时间
-
- # 替换原始内容
- if translated_lines:
- # 构建新的body内容
- new_body_content = []
- current_index = 0
-
- # 遍历原始内容,替换需要翻译的部分
- for line in body_content.split('\n'):
- line = line.strip()
- if not line:
- new_body_content.append('')
- continue
-
- if line.startswith('<'):
- if ('<p class' in line or line.startswith('<h')) and current_index < len(translated_lines):
- # 替换翻译后的内容
- new_body_content.append(translated_lines[current_index])
- current_index += 1
- else:
- # 保持原样
- new_body_content.append(line)
- else:
- # 保持非HTML内容原样
- new_body_content.append(line)
-
- # 将新内容重新组合
- new_body_content = '\n'.join(new_body_content)
-
- # 替换原始内容中的body部分
- new_content = content.replace(body_content, new_body_content)
-
- # 保存修改后的文件
- output_dir = config.get('paths', 'output_dir')
- os.makedirs(output_dir, exist_ok=True)
- output_path = os.path.join(output_dir, os.path.basename(file_path))
-
- with open(output_path, 'w', encoding='utf-8') as f:
- f.write(new_content)
-
- # 更新完成状态
- db_manager.update_file_progress(file_path, total_lines, total_lines, 'completed')
- print(f"文件 {file_path} 翻译完成,已保存到 {output_path}")
-
- # 显示最终统计信息
- print("\n翻译统计信息:")
- for key, value in process_stats.get_stats().items():
- print(f"{key}: {value}")
-
- except Exception as e:
- print(f"处理文件时出错: {str(e)}")
- if 'processed_lines' in locals():
- db_manager.update_file_progress(file_path, total_lines, processed_lines, 'error')
- raise
-
- # 返回进程的统计信息
- return process_stats.to_dict()
-
- except Exception as e:
- logging.error(f"进程 {process_id} 发生错误: {str(e)}")
- return None
- finally:
- if 'process_db' in locals():
- process_db.close()
- def aggregate_stats(stats_list):
- """聚合所有进程的统计信息"""
- aggregated_stats = {
- "total_chars": 0,
- "translated_chars": 0,
- "total_requests": 0,
- "successful_requests": 0,
- "failed_requests": 0,
- "elapsed_time": 0
- }
-
- for stats in stats_list:
- if not stats:
- continue
-
- for key in aggregated_stats:
- if key in stats:
- if key == "elapsed_time":
- aggregated_stats[key] = max(aggregated_stats[key], stats[key])
- else:
- aggregated_stats[key] += stats[key]
-
- # 创建统计对象并格式化输出
- final_stats = TranslationStats.from_dict(aggregated_stats)
- return final_stats.get_stats()
- def main():
- # 设置进程数
- num_processes = 2
-
- # 获取输入目录中的所有HTML文件
- input_dir = config.get('paths', 'input_dir')
- html_files = [f for f in os.listdir(input_dir) if f.endswith('.html')]
-
- print(f"找到 {len(html_files)} 个HTML文件需要处理")
- print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-
- # 调整进程数,确保不超过文件数量
- num_processes = min(num_processes, len(html_files))
-
- # 将文件列表分成多个批次
- if num_processes > 0:
- batch_size = max(1, len(html_files) // num_processes)
- file_batches = [html_files[i:i + batch_size] for i in range(0, len(html_files), batch_size)]
- else:
- print("没有找到需要处理的HTML文件")
- return
-
- try:
- # 创建进程池
- with Pool(processes=num_processes) as pool:
- # 启动多个进程处理文件并收集结果
- results = []
- for i, batch in enumerate(file_batches):
- result = pool.apply_async(process_files_batch, args=(batch, i+1))
- results.append(result)
-
- # 等待所有进程完成并收集结果
- stats_list = [r.get() for r in results]
-
- except Exception as e:
- logging.error(f"进程池执行出错: {str(e)}")
- stats_list = []
-
- # 聚合统计信息
- final_stats = aggregate_stats(stats_list)
-
- print(f"\n结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print("\n最终统计信息:")
- for key, value in final_stats.items():
- print(f"{key}: {value}")
- if __name__ == "__main__":
- # 设置多进程启动方法
- multiprocessing.set_start_method('spawn')
- # 设置日志格式
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- main()
|