123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- #!/usr/bin/env python3
- """
- 性能测试脚本 - 对比原版本和优化版本的性能
- """
- import time
- import os
- import sqlite3
- import subprocess
- import sys
- from datetime import datetime
- def get_db_stats(db_path):
- """获取数据库统计信息"""
- if not os.path.exists(db_path):
- return None
-
- conn = sqlite3.connect(db_path)
- c = conn.cursor()
-
- stats = {}
-
- # 文件进度统计
- c.execute("SELECT COUNT(*) FROM file_progress WHERE status = 'completed'")
- stats['completed_files'] = c.fetchone()[0]
-
- c.execute("SELECT COUNT(*) FROM file_progress")
- stats['total_files'] = c.fetchone()[0]
-
- # 翻译缓存统计(如果存在)
- try:
- c.execute("SELECT COUNT(*) FROM translation_cache")
- stats['cached_translations'] = c.fetchone()[0]
-
- c.execute("SELECT AVG(access_count) FROM translation_cache")
- avg_access = c.fetchone()[0]
- stats['avg_cache_access'] = avg_access if avg_access else 0
- except:
- stats['cached_translations'] = 0
- stats['avg_cache_access'] = 0
-
- conn.close()
- return stats
- def backup_database():
- """备份现有数据库"""
- db_path = "translation_progress.db"
- if os.path.exists(db_path):
- backup_path = f"translation_progress_backup_{int(time.time())}.db"
- os.rename(db_path, backup_path)
- print(f"数据库已备份到: {backup_path}")
- return backup_path
- return None
- def restore_database(backup_path):
- """恢复数据库"""
- if backup_path and os.path.exists(backup_path):
- db_path = "translation_progress.db"
- if os.path.exists(db_path):
- os.remove(db_path)
- os.rename(backup_path, db_path)
- print(f"数据库已从备份恢复: {backup_path}")
- def run_test(script_name, config_file=None, test_files=3):
- """运行测试"""
- print(f"\n{'='*60}")
- print(f"测试脚本: {script_name}")
- if config_file:
- print(f"配置文件: {config_file}")
- print(f"测试文件数量: {test_files}")
- print(f"{'='*60}")
-
- # 记录开始时间
- start_time = time.time()
-
- # 构建命令
- cmd = [sys.executable, script_name]
-
- # 设置环境变量指定配置文件
- env = os.environ.copy()
- if config_file:
- env['TRANSLATION_CONFIG'] = config_file
-
- try:
- # 运行脚本(仅处理前几个文件用于测试)
- print(f"开始时间: {datetime.now().strftime('%H:%M:%S')}")
-
- # 这里需要修改脚本以支持限制处理文件数量
- # 或者手动中断测试
-
- # 由于测试目的,我们设置一个较短的超时时间
- result = subprocess.run(
- cmd,
- env=env,
- capture_output=True,
- text=True,
- timeout=300 # 5分钟超时
- )
-
- end_time = time.time()
- duration = end_time - start_time
-
- print(f"结束时间: {datetime.now().strftime('%H:%M:%S')}")
- print(f"运行时长: {duration:.2f} 秒")
-
- # 获取数据库统计
- stats = get_db_stats("translation_progress.db")
-
- return {
- 'script': script_name,
- 'duration': duration,
- 'stats': stats,
- 'success': result.returncode == 0,
- 'output': result.stdout,
- 'error': result.stderr
- }
-
- except subprocess.TimeoutExpired:
- end_time = time.time()
- duration = end_time - start_time
-
- print(f"测试超时 (5分钟)")
- print(f"运行时长: {duration:.2f} 秒")
-
- # 获取数据库统计
- stats = get_db_stats("translation_progress.db")
-
- return {
- 'script': script_name,
- 'duration': duration,
- 'stats': stats,
- 'success': False,
- 'timeout': True,
- 'output': "测试超时",
- 'error': "超时"
- }
-
- except Exception as e:
- print(f"测试失败: {e}")
- return {
- 'script': script_name,
- 'duration': 0,
- 'stats': None,
- 'success': False,
- 'error': str(e)
- }
- def compare_results(results):
- """对比测试结果"""
- print(f"\n{'='*80}")
- print("📊 性能对比结果")
- print(f"{'='*80}")
-
- for i, result in enumerate(results, 1):
- print(f"\n{i}. {result['script']}")
- print(f" 运行时长: {result['duration']:.2f} 秒")
- print(f" 执行状态: {'✅ 成功' if result['success'] else '❌ 失败'}")
-
- if result.get('timeout'):
- print(f" 状态说明: ⏰ 超时")
-
- if result['stats']:
- stats = result['stats']
- print(f" 完成文件: {stats['completed_files']}/{stats['total_files']}")
- print(f" 缓存条目: {stats['cached_translations']}")
- print(f" 平均缓存访问: {stats['avg_cache_access']:.1f}")
-
- if result.get('error') and result['error'] != "超时":
- print(f" 错误信息: {result['error'][:100]}...")
-
- # 性能对比
- if len(results) >= 2:
- print(f"\n🚀 性能提升分析:")
- original = results[0]
- optimized = results[1]
-
- if original['success'] and optimized['success']:
- time_improvement = (original['duration'] - optimized['duration']) / original['duration'] * 100
- print(f" 时间优化: {time_improvement:+.1f}%")
-
- if optimized['stats'] and optimized['stats']['cached_translations'] > 0:
- print(f" 缓存效果: {optimized['stats']['cached_translations']} 条翻译被缓存")
- print(f" 缓存复用: 平均每条翻译被访问 {optimized['stats']['avg_cache_access']:.1f} 次")
- def main():
- """主测试函数"""
- print("🧪 EPUB翻译器性能测试")
- print("本测试将对比原版本和优化版本的性能差异")
-
- # 检查脚本文件是否存在
- original_script = "translate_epub_v4(单线程版本)V3.py"
- optimized_script = "translate_epub_v4_optimized.py"
- optimized_config = "config_optimized.yaml"
-
- if not os.path.exists(original_script):
- print(f"❌ 找不到原始脚本: {original_script}")
- return
-
- if not os.path.exists(optimized_script):
- print(f"❌ 找不到优化脚本: {optimized_script}")
- return
-
- # 备份现有数据库
- backup_path = backup_database()
-
- results = []
-
- try:
- # 测试1: 原始版本
- print(f"\n🔄 测试 1/2: 原始版本")
- result1 = run_test(original_script)
- results.append(result1)
-
- # 清理数据库准备下一个测试
- if os.path.exists("translation_progress.db"):
- os.remove("translation_progress.db")
-
- # 测试2: 优化版本
- print(f"\n🔄 测试 2/2: 优化版本")
- result2 = run_test(optimized_script, optimized_config)
- results.append(result2)
-
- # 对比结果
- compare_results(results)
-
- except KeyboardInterrupt:
- print("\n⏹️ 测试被用户中断")
-
- finally:
- # 恢复原始数据库
- if backup_path:
- restore_database(backup_path)
-
- print(f"\n✅ 测试完成")
- if __name__ == "__main__":
- main()
|