Translation Service

- Added translation service with 3 services.
- MyMemory (Free no api key needed but 1000 words a day and Skow)
- Deepl (Free with API key, 500000 words a month and fast)
- Libre Translate (Paid unless you host your own server, open source)
- Added caching for Quick Access and the translate service to speed up the UI. Can be fast depending on the service you use/ PC specs and etc).
This commit is contained in:
Yusarina
2025-08-07 13:58:40 +01:00
parent c28cfe1d1d
commit 61c77cf756
10 changed files with 3018 additions and 12 deletions
+600
View File
@@ -0,0 +1,600 @@
# GPL License
import json
import os
import time
import threading
from typing import Dict, List, Optional, Tuple, Set, Any, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import bpy
from bpy.types import Object, Material, ShapeKey
from .translation_service import get_translation_manager, TranslationServiceManager
from .enhanced_dictionaries import get_enhanced_translator, EnhancedDictionaryTranslator
from .logging_setup import logger
from .addon_preferences import get_preference, save_preference
from .translations import t
class TranslationMode(Enum):
"""Translation modes for different approaches"""
DICTIONARY_ONLY = "dictionary_only"
API_ONLY = "api_only"
HYBRID = "hybrid" # Default: Dictionary first, then API fallback
@dataclass
class TranslationJob:
"""Represents a translation job for batch processing"""
name: str
category: str
source_lang: str = "ja"
target_lang: str = "en"
object_ref: Optional[Any] = None
property_name: Optional[str] = None
@dataclass
class TranslationResult:
"""Result of a translation operation"""
original: str
translated: str
method: str # "dictionary", "api", "failed"
service: Optional[str] = None
category: str = "unknown"
confidence: float = 1.0
class TranslationCache:
"""Persistent translation cache with file storage"""
def __init__(self):
self._cache: Dict[str, Dict[str, str]] = {}
self._cache_file = self._get_cache_file_path()
self._cache_lock = threading.Lock()
self._load_cache()
def _get_cache_file_path(self) -> str:
"""Get the cache file path in user preferences directory"""
user_path = bpy.utils.resource_path('USER')
cache_dir = os.path.join(user_path, "config", "avatar_toolkit_prefs")
os.makedirs(cache_dir, exist_ok=True)
return os.path.join(cache_dir, "translation_cache.json")
def _load_cache(self) -> None:
"""Load cache from file"""
try:
if os.path.exists(self._cache_file):
with open(self._cache_file, 'r', encoding='utf-8') as f:
self._cache = json.load(f)
logger.debug(f"Loaded translation cache with {len(self._cache)} entries")
else:
self._cache = {}
except Exception as e:
logger.warning(f"Failed to load translation cache: {e}")
self._cache = {}
def _save_cache(self) -> None:
"""Save cache to file"""
try:
with open(self._cache_file, 'w', encoding='utf-8') as f:
json.dump(self._cache, f, indent=2, ensure_ascii=False)
logger.debug(f"Saved translation cache with {len(self._cache)} entries")
except Exception as e:
logger.error(f"Failed to save translation cache: {e}")
def get(self, text: str, source_lang: str, target_lang: str) -> Optional[str]:
"""Get cached translation"""
cache_key = f"{source_lang}_{target_lang}"
with self._cache_lock:
if cache_key in self._cache and text in self._cache[cache_key]:
return self._cache[cache_key][text]
return None
def put(self, text: str, translation: str, source_lang: str, target_lang: str) -> None:
"""Store translation in cache"""
cache_key = f"{source_lang}_{target_lang}"
with self._cache_lock:
if cache_key not in self._cache:
self._cache[cache_key] = {}
self._cache[cache_key][text] = translation
# Save cache periodically (every 10 new entries)
if len(self._cache.get(cache_key, {})) % 10 == 0:
self._save_cache()
def clear(self) -> None:
"""Clear all cached translations"""
with self._cache_lock:
self._cache.clear()
self._save_cache()
logger.info("Translation cache cleared")
def get_stats(self) -> Dict[str, int]:
"""Get cache statistics"""
with self._cache_lock:
total_entries = sum(len(lang_cache) for lang_cache in self._cache.values())
return {
"language_pairs": len(self._cache),
"total_entries": total_entries
}
class AvatarToolkitTranslationManager:
"""Main translation manager for Avatar Toolkit"""
def __init__(self):
self.service_manager: TranslationServiceManager = get_translation_manager()
self.dictionary_translator: EnhancedDictionaryTranslator = get_enhanced_translator()
self.cache: TranslationCache = TranslationCache()
self.translation_mode: TranslationMode = TranslationMode(
get_preference("translation_mode", "hybrid")
)
self._progress_callback: Optional[Callable[[int, int, str], None]] = None
def set_translation_mode(self, mode: TranslationMode) -> None:
"""Set the translation mode"""
self.translation_mode = mode
save_preference("translation_mode", mode.value)
logger.info(f"Translation mode set to: {mode.value}")
def set_progress_callback(self, callback: Optional[Callable[[int, int, str], None]]) -> None:
"""Set progress callback for batch operations"""
self._progress_callback = callback
def translate_single(self, name: str, category: str = "auto",
source_lang: str = "ja", target_lang: str = "en") -> TranslationResult:
"""Translate a single name with comprehensive fallback logic"""
if not name or not name.strip():
return TranslationResult(name, name, "skipped")
original_name = name.strip()
# Check cache first
cached_result = self.cache.get(original_name, source_lang, target_lang)
if cached_result:
return TranslationResult(original_name, cached_result, "cache", category=category)
# Dictionary translation (always try first in hybrid mode)
if self.translation_mode in [TranslationMode.DICTIONARY_ONLY, TranslationMode.HYBRID]:
dict_result, detected_category = self.dictionary_translator.translate_name(original_name, category)
if dict_result:
self.cache.put(original_name, dict_result, source_lang, target_lang)
return TranslationResult(original_name, dict_result, "dictionary",
category=detected_category, confidence=1.0)
if self.translation_mode in [TranslationMode.API_ONLY, TranslationMode.HYBRID]:
try:
api_result, service_name = self.service_manager.translate_with_fallback(
original_name, source_lang, target_lang
)
if api_result != original_name: # Translation succeeded
self.cache.put(original_name, api_result, source_lang, target_lang)
return TranslationResult(original_name, api_result, "api",
service=service_name, category=category, confidence=0.8)
except Exception as e:
logger.warning(f"API translation failed for '{original_name}': {e}")
# No translation available
return TranslationResult(original_name, original_name, "failed", category=category)
def translate_batch(self, jobs: List[TranslationJob],
apply_results: bool = True) -> List[TranslationResult]:
"""Translate multiple items in batch with progress reporting and interruption handling"""
results = []
total_jobs = len(jobs)
logger.info(f"Starting batch translation of {total_jobs} items")
# Group jobs by category for more efficient processing
jobs_by_category: Dict[str, List[TranslationJob]] = {}
for job in jobs:
if job.category not in jobs_by_category:
jobs_by_category[job.category] = []
jobs_by_category[job.category].append(job)
completed = 0
start_time = time.time()
for category, category_jobs in jobs_by_category.items():
logger.debug(f"Processing {len(category_jobs)} {category} translations")
# Check if we can use optimized batch translation for API calls
can_use_api_batch = (self.translation_mode in [TranslationMode.API_ONLY, TranslationMode.HYBRID] and
len(category_jobs) > 3)
if can_use_api_batch:
# Try optimized batch translation with API
batch_results = self._process_category_batch_optimized(category_jobs, completed, total_jobs, start_time)
if batch_results:
# Apply results to Blender objects if requested
for i, (job, result) in enumerate(zip(category_jobs, batch_results)):
if apply_results and result.method != "failed" and job.object_ref:
try:
self._apply_translation_to_object(job, result)
logger.debug(f"Successfully applied translation: {job.name} -> {result.translated}")
except Exception as e:
logger.error(f"Failed to apply translation to object {job.name}: {e}")
result.method = "apply_failed"
result.translated = job.name
results.extend(batch_results)
completed += len(category_jobs)
progress_percent = (completed / total_jobs) * 100
logger.info(f"Batch translation progress: {completed}/{total_jobs} ({progress_percent:.1f}%) - completed {category} batch")
continue
# Fallback to individual processing
for job in category_jobs:
# Check if we should continue (for potential cancellation support)
current_time = time.time()
elapsed_time = current_time - start_time
# Progress callback with detailed status
if self._progress_callback:
avg_time_per_item = elapsed_time / max(completed, 1)
remaining_items = total_jobs - completed
estimated_remaining = avg_time_per_item * remaining_items
status_msg = f"Translating {job.name}"
if completed > 0:
status_msg += f" (ETA: {estimated_remaining:.1f}s)"
self._progress_callback(completed, total_jobs, status_msg)
try:
logger.debug(f"Translating job {completed + 1}/{total_jobs}: {job.name} ({job.category})")
result = self.translate_single(job.name, job.category,
job.source_lang, job.target_lang)
if apply_results and result.method != "failed" and job.object_ref:
try:
self._apply_translation_to_object(job, result)
logger.debug(f"Successfully applied translation: {job.name} -> {result.translated}")
except Exception as e:
logger.error(f"Failed to apply translation to object {job.name}: {e}")
result.method = "apply_failed"
result.translated = job.name
results.append(result)
except Exception as e:
logger.error(f"Translation failed for job {job.name}: {e}")
# Create a failed result
failed_result = TranslationResult(
original=job.name,
translated=job.name,
method="failed",
category=job.category
)
results.append(failed_result)
completed += 1
# Log progress periodically
if completed % 10 == 0 or completed == total_jobs:
progress_percent = (completed / total_jobs) * 100
logger.info(f"Batch translation progress: {completed}/{total_jobs} ({progress_percent:.1f}%)")
if self._progress_callback:
total_time = time.time() - start_time
self._progress_callback(total_jobs, total_jobs, f"Translation complete ({total_time:.1f}s)")
successful = sum(1 for r in results if r.method not in ["failed", "skipped", "apply_failed"])
failed = sum(1 for r in results if r.method in ["failed", "apply_failed"])
skipped = sum(1 for r in results if r.method == "skipped")
dictionary_count = sum(1 for r in results if r.method == "dictionary")
api_count = sum(1 for r in results if r.method == "api")
cache_count = sum(1 for r in results if r.method == "cache")
logger.info(f"Batch translation complete: {successful}/{total_jobs} successful, {failed} failed, {skipped} skipped")
logger.info(f"Translation methods used: Dictionary: {dictionary_count}, API: {api_count}, Cache: {cache_count}")
return results
def _process_category_batch_optimized(self, category_jobs: List[TranslationJob],
completed: int, total_jobs: int, start_time: float) -> Optional[List[TranslationResult]]:
"""Process a batch of jobs from the same category using optimized API batch translation"""
if not category_jobs:
return []
logger.info(f"Starting optimized batch translation for {len(category_jobs)} {category_jobs[0].category} items")
api_batch_jobs = []
api_batch_texts = []
results = [None] * len(category_jobs)
# First pass: try dictionary translations and collect API candidates
for i, job in enumerate(category_jobs):
if not job.name or not job.name.strip():
results[i] = TranslationResult(job.name, job.name, "skipped", category=job.category)
continue
original_name = job.name.strip()
# Check cache first
cached_result = self.cache.get(original_name, job.source_lang, job.target_lang)
if cached_result:
results[i] = TranslationResult(original_name, cached_result, "cache", category=job.category)
continue
# Try dictionary translation first (if in hybrid mode)
if self.translation_mode == TranslationMode.HYBRID:
dict_result, detected_category = self.dictionary_translator.translate_name(original_name, job.category)
if dict_result:
self.cache.put(original_name, dict_result, job.source_lang, job.target_lang)
results[i] = TranslationResult(original_name, dict_result, "dictionary",
category=detected_category, confidence=1.0)
continue
# Add to API batch candidates
api_batch_jobs.append((i, job))
api_batch_texts.append(original_name)
# Process API batch if we have candidates
if api_batch_texts:
logger.info(f"Sending {len(api_batch_texts)} items to API batch translation")
if self._progress_callback:
elapsed_time = time.time() - start_time
avg_time_per_item = elapsed_time / max(completed, 1) if completed > 0 else 1.0
remaining_items = total_jobs - completed
estimated_remaining = avg_time_per_item * remaining_items
status_msg = f"Batch translating {len(api_batch_texts)} {category_jobs[0].category} items"
if completed > 0:
status_msg += f" (ETA: {estimated_remaining:.1f}s)"
self._progress_callback(completed, total_jobs, status_msg)
try:
# Use the service manager's optimized batch translation
if len(set(job.source_lang for _, job in api_batch_jobs)) == 1 and len(set(job.target_lang for _, job in api_batch_jobs)) == 1:
source_lang = api_batch_jobs[0][1].source_lang
target_lang = api_batch_jobs[0][1].target_lang
batch_results = self.service_manager.batch_translate_with_fallback(
api_batch_texts, source_lang, target_lang
)
for j, (result_idx, job) in enumerate(api_batch_jobs):
if j < len(batch_results):
translated_text, service_name = batch_results[j]
# Cache successful translations
if translated_text != job.name:
self.cache.put(job.name.strip(), translated_text, job.source_lang, job.target_lang)
results[result_idx] = TranslationResult(
original=job.name.strip(),
translated=translated_text,
method="api" if translated_text != job.name else "failed",
service=service_name,
category=job.category,
confidence=0.8
)
else:
# Fallback for missing results
results[result_idx] = TranslationResult(job.name, job.name, "failed", category=job.category)
else:
# Mixed language pairs - fallback to individual translations
logger.info("Mixed language pairs detected, falling back to individual API translations")
for result_idx, job in api_batch_jobs:
try:
result = self.translate_single(job.name, job.category, job.source_lang, job.target_lang)
results[result_idx] = result
except Exception as e:
logger.error(f"Individual API translation failed for {job.name}: {e}")
results[result_idx] = TranslationResult(job.name, job.name, "failed", category=job.category)
except Exception as e:
logger.error(f"Batch API translation failed: {e}")
# Fallback to individual translations
for result_idx, job in api_batch_jobs:
try:
result = self.translate_single(job.name, job.category, job.source_lang, job.target_lang)
results[result_idx] = result
except Exception as individual_e:
logger.error(f"Individual fallback translation failed for {job.name}: {individual_e}")
results[result_idx] = TranslationResult(job.name, job.name, "failed", category=job.category)
for i, result in enumerate(results):
if result is None:
results[i] = TranslationResult(category_jobs[i].name, category_jobs[i].name, "failed", category=category_jobs[i].category)
successful_batch = sum(1 for r in results if r.method not in ["failed", "skipped"])
logger.info(f"Optimized batch complete: {successful_batch}/{len(category_jobs)} successful")
return results
def _apply_translation_to_object(self, job: TranslationJob, result: TranslationResult) -> None:
"""Apply translation result to a Blender object"""
if not job.object_ref or not job.property_name:
return
try:
setattr(job.object_ref, job.property_name, result.translated)
logger.debug(f"Applied translation: {job.object_ref.name}.{job.property_name} = '{result.translated}'")
except Exception as e:
logger.error(f"Failed to set property {job.property_name}: {e}")
raise
def translate_armature_bones(self, armature: Object, apply_results: bool = True) -> List[TranslationResult]:
"""Translate all bone names in an armature"""
if not armature or armature.type != 'ARMATURE':
return []
jobs = []
for bone in armature.data.bones:
jobs.append(TranslationJob(
name=bone.name,
category="bones",
object_ref=bone,
property_name="name"
))
return self.translate_batch(jobs, apply_results)
def translate_object_shapekeys(self, mesh_obj: Object, apply_results: bool = True) -> List[TranslationResult]:
"""Translate all shape key names in a mesh object"""
if not mesh_obj or mesh_obj.type != 'MESH' or not mesh_obj.data.shape_keys:
return []
jobs = []
for shape_key in mesh_obj.data.shape_keys.key_blocks:
jobs.append(TranslationJob(
name=shape_key.name,
category="shapekeys",
object_ref=shape_key,
property_name="name"
))
return self.translate_batch(jobs, apply_results)
def translate_scene_materials(self, apply_results: bool = True) -> List[TranslationResult]:
"""Translate all material names in the scene"""
jobs = []
processed_materials: Set[str] = set()
for obj in bpy.data.objects:
if obj.type == 'MESH' and obj.data.materials:
for material in obj.data.materials:
if material and material.name not in processed_materials:
jobs.append(TranslationJob(
name=material.name,
category="materials",
object_ref=material,
property_name="name"
))
processed_materials.add(material.name)
return self.translate_batch(jobs, apply_results)
def translate_scene_objects(self, object_types: Optional[Set[str]] = None,
apply_results: bool = True) -> List[TranslationResult]:
"""Translate all object names in the scene"""
if object_types is None:
object_types = {'MESH', 'ARMATURE', 'EMPTY'}
jobs = []
for obj in bpy.data.objects:
if obj.type in object_types:
jobs.append(TranslationJob(
name=obj.name,
category="objects",
object_ref=obj,
property_name="name"
))
return self.translate_batch(jobs, apply_results)
def get_translation_stats(self) -> Dict[str, Any]:
"""Get comprehensive translation statistics"""
dict_stats = self.dictionary_translator.get_statistics()
cache_stats = self.cache.get_stats()
available_services = self.service_manager.get_available_services()
return {
"dictionary_translations": dict_stats,
"cache_stats": cache_stats,
"available_services": available_services,
"current_mode": self.translation_mode.value,
"primary_service": get_preference("translation_service", "microsoft")
}
def clear_all_caches(self) -> None:
"""Clear all translation caches"""
self.cache.clear()
for service_id, service in self.service_manager._services.items():
service.clear_cache()
logger.info("All translation caches cleared")
_translation_manager: Optional[AvatarToolkitTranslationManager] = None
def get_avatar_translation_manager() -> AvatarToolkitTranslationManager:
"""Get the global Avatar Toolkit translation manager"""
global _translation_manager
if _translation_manager is None:
_translation_manager = AvatarToolkitTranslationManager()
return _translation_manager
def translate_name_simple(name: str, category: str = "auto") -> str:
"""Simple translation function for quick use"""
manager = get_avatar_translation_manager()
result = manager.translate_single(name, category)
return result.translated
def is_translation_service_available(service_name: str) -> bool:
"""Check if a specific translation service is available"""
manager = get_avatar_translation_manager()
available_services = manager.service_manager.get_available_services()
return any(service_id == service_name for service_id, _ in available_services)
def get_available_translation_services() -> List[Tuple[str, str]]:
"""Get list of available translation services"""
manager = get_avatar_translation_manager()
return manager.service_manager.get_available_services()
def get_batch_translation_info() -> Dict[str, Dict[str, Any]]:
"""Get information about batch translation capabilities of available services"""
manager = get_avatar_translation_manager()
batch_info = {}
for service_id, service_name in manager.service_manager.get_available_services():
service = manager.service_manager.get_service(service_id)
if service:
batch_info[service_id] = {
'name': service_name,
'supports_batch': service.supports_batch_translation(),
'batch_type': 'native' if service_id == 'deepl' else 'concurrent' if service_id in ['libretranslate', 'mymemory'] else 'individual'
}
return batch_info
def configure_translation_service(service_id: str, **config) -> bool:
"""Configure a translation service with the provided settings (now with batch support)"""
try:
success = False
if service_id == "deepl":
from .translation_service import configure_deepl_translator
success = configure_deepl_translator(
config.get("api_key", ""),
config.get("use_free_api", True)
)
if success:
logger.info("DeepL configured with native batch translation support (up to 50 texts per request)")
elif service_id == "libretranslate":
from .translation_service import configure_libretranslate_server
success = configure_libretranslate_server(
config.get("server_url", "https://libretranslate.com"),
config.get("api_key", None)
)
if success:
logger.info("LibreTranslate configured with concurrent batch processing (3x faster)")
elif service_id == "microsoft":
from .translation_service import configure_microsoft_translator
success = configure_microsoft_translator(
config.get("api_key", ""),
config.get("region", "global")
)
else:
logger.error(f"Unknown translation service: {service_id}")
success = False
return success
except Exception as e:
logger.error(f"Failed to configure translation service {service_id}: {e}")
return False