994 lines
42 KiB
Python
994 lines
42 KiB
Python
# GPL License
|
|
|
|
import json
|
|
import time
|
|
import requests
|
|
import threading
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, List, Optional, Tuple, Any, Set
|
|
from dataclasses import dataclass
|
|
from urllib.parse import urlencode
|
|
import uuid
|
|
|
|
from .logging_setup import logger
|
|
from .addon_preferences import save_preference, get_preference
|
|
|
|
|
|
def safe_decode_text(text: str) -> str:
|
|
"""Safely decode text that might be in various encodings (UTF-8, Shift-JIS, etc.)"""
|
|
if not text:
|
|
return text
|
|
|
|
# If it's already a proper string, return it
|
|
if isinstance(text, str):
|
|
try:
|
|
# Test if it's valid UTF-8
|
|
text.encode('utf-8')
|
|
return text
|
|
except (UnicodeDecodeError, UnicodeEncodeError):
|
|
pass
|
|
|
|
# Try common encodings for Japanese text
|
|
encodings = ['utf-8', 'shift-jis', 'cp932', 'euc-jp', 'iso-2022-jp']
|
|
|
|
for encoding in encodings:
|
|
try:
|
|
if isinstance(text, bytes):
|
|
return text.decode(encoding)
|
|
else:
|
|
# Try to re-encode and decode
|
|
return text.encode('latin-1', errors='ignore').decode(encoding, errors='ignore')
|
|
except (UnicodeDecodeError, UnicodeEncodeError, AttributeError):
|
|
continue
|
|
|
|
# Fallback: replace problematic characters
|
|
try:
|
|
if isinstance(text, bytes):
|
|
return text.decode('utf-8', errors='replace')
|
|
else:
|
|
return str(text).encode('utf-8', errors='replace').decode('utf-8')
|
|
except:
|
|
return str(text)
|
|
|
|
|
|
@dataclass
|
|
class TranslationRequest:
|
|
"""Represents a translation request"""
|
|
text: str
|
|
source_lang: str = "ja"
|
|
target_lang: str = "en"
|
|
category: str = "general"
|
|
|
|
|
|
@dataclass
|
|
class TranslationResult:
|
|
"""Represents a translation result"""
|
|
original: str
|
|
translated: str
|
|
service: str
|
|
confidence: float = 1.0
|
|
cached: bool = False
|
|
|
|
|
|
class TranslationError(Exception):
|
|
"""Custom exception for translation errors"""
|
|
pass
|
|
|
|
|
|
class TranslationService(ABC):
|
|
"""Abstract base class for translation services"""
|
|
|
|
def __init__(self, name: str):
|
|
self.name = name
|
|
self._cache: Dict[str, str] = {}
|
|
self._rate_limit_lock = threading.Lock()
|
|
self._last_request_time = 0.0
|
|
self._request_count = 0
|
|
self._rate_limit_per_second = 10 # Default rate limit
|
|
|
|
@abstractmethod
|
|
def translate_text(self, text: str, source_lang: str = "ja", target_lang: str = "en") -> str:
|
|
"""Translate a single text string"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def is_available(self) -> bool:
|
|
"""Check if the service is available"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_supported_languages(self) -> List[Tuple[str, str]]:
|
|
"""Get list of supported language pairs (code, name)"""
|
|
pass
|
|
|
|
def batch_translate(self, texts: List[str], source_lang: str = "ja", target_lang: str = "en") -> List[str]:
|
|
"""Translate multiple texts with rate limiting - base implementation for services without native batch support"""
|
|
results = []
|
|
for text in texts:
|
|
# Check cache first
|
|
cache_key = f"{source_lang}_{target_lang}_{text}"
|
|
if cache_key in self._cache:
|
|
results.append(self._cache[cache_key])
|
|
continue
|
|
|
|
# Rate limiting
|
|
with self._rate_limit_lock:
|
|
current_time = time.time()
|
|
if current_time - self._last_request_time < (1.0 / self._rate_limit_per_second):
|
|
time.sleep((1.0 / self._rate_limit_per_second) - (current_time - self._last_request_time))
|
|
|
|
try:
|
|
translated = self.translate_text(text, source_lang, target_lang)
|
|
self._cache[cache_key] = translated
|
|
results.append(translated)
|
|
self._last_request_time = time.time()
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Translation failed for '{text}': {e}")
|
|
results.append(text)
|
|
|
|
return results
|
|
|
|
def supports_batch_translation(self) -> bool:
|
|
"""Check if service supports native batch translation"""
|
|
return False
|
|
|
|
def clear_cache(self) -> None:
|
|
"""Clear the translation cache"""
|
|
self._cache.clear()
|
|
logger.info(f"Cleared cache for {self.name}")
|
|
|
|
|
|
|
|
|
|
class DeepLService(TranslationService):
|
|
"""DeepL translation service - requires API key"""
|
|
|
|
def __init__(self, api_key: str = "", use_free_api: bool = True):
|
|
super().__init__("DeepL" + (" (Free)" if use_free_api else " (Pro)"))
|
|
self.api_key = api_key
|
|
self.use_free_api = use_free_api
|
|
self._rate_limit_per_second = 5 # DeepL allows more requests
|
|
self._base_url = "https://api-free.deepl.com" if use_free_api else "https://api.deepl.com"
|
|
|
|
def translate_text(self, text: str, source_lang: str = "ja", target_lang: str = "en") -> str:
|
|
"""Translate text using DeepL API"""
|
|
# Ensure text is properly encoded
|
|
text = safe_decode_text(text)
|
|
logger.info(f"DeepL: Starting translation of '{text}' from {source_lang} to {target_lang}")
|
|
|
|
if not text or not text.strip():
|
|
logger.debug("Empty text provided, returning as-is")
|
|
return text
|
|
|
|
if not self.api_key:
|
|
raise TranslationError("DeepL API key is required")
|
|
|
|
# DeepL language codes mapping
|
|
lang_map = {
|
|
"ja": "JA", "en": "EN", "ko": "KO", "zh": "ZH",
|
|
"es": "ES", "fr": "FR", "de": "DE", "it": "IT",
|
|
"pt": "PT", "ru": "RU", "nl": "NL", "pl": "PL"
|
|
}
|
|
source_lang = lang_map.get(source_lang, source_lang.upper())
|
|
target_lang = lang_map.get(target_lang, target_lang.upper())
|
|
|
|
endpoint = f"{self._base_url}/v2/translate"
|
|
headers = {
|
|
"Authorization": f"DeepL-Auth-Key {self.api_key}",
|
|
"Content-Type": "application/x-www-form-urlencoded"
|
|
}
|
|
|
|
data = {
|
|
"text": text,
|
|
"source_lang": source_lang,
|
|
"target_lang": target_lang
|
|
}
|
|
|
|
try:
|
|
logger.debug(f"Making request to DeepL API: {endpoint}")
|
|
response = requests.post(endpoint, headers=headers, data=data, timeout=15)
|
|
logger.debug(f"DeepL response status: {response.status_code}")
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
logger.debug(f"DeepL response: {result}")
|
|
|
|
if "translations" in result and len(result["translations"]) > 0:
|
|
translated_text = result["translations"][0]["text"]
|
|
logger.info(f"DeepL SUCCESS: '{text}' -> '{translated_text}'")
|
|
return translated_text
|
|
else:
|
|
raise TranslationError("DeepL API returned no translations")
|
|
|
|
except requests.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise TranslationError("DeepL API key is invalid")
|
|
elif e.response.status_code == 403:
|
|
raise TranslationError("DeepL API key access denied or quota exceeded")
|
|
elif e.response.status_code == 456:
|
|
raise TranslationError("DeepL quota exceeded")
|
|
else:
|
|
logger.error(f"DeepL HTTP error: {e}")
|
|
raise TranslationError(f"DeepL API error: {e}")
|
|
except requests.Timeout:
|
|
logger.error("DeepL request timed out")
|
|
raise TranslationError("DeepL request timed out after 15 seconds")
|
|
except requests.RequestException as e:
|
|
logger.error(f"DeepL API request failed: {e}")
|
|
raise TranslationError(f"DeepL API request failed: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in DeepL: {e}")
|
|
raise TranslationError(f"Unexpected error: {e}")
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if DeepL service is available"""
|
|
if not self.api_key:
|
|
return False
|
|
|
|
try:
|
|
headers = {"Authorization": f"DeepL-Auth-Key {self.api_key}"}
|
|
response = requests.get(f"{self._base_url}/v2/usage", headers=headers, timeout=5)
|
|
return response.status_code == 200
|
|
except:
|
|
return False
|
|
|
|
def get_supported_languages(self) -> List[Tuple[str, str]]:
|
|
"""Get supported languages for DeepL"""
|
|
return [
|
|
("ja", "Japanese"),
|
|
("en", "English"),
|
|
("ko", "Korean"),
|
|
("zh", "Chinese"),
|
|
("es", "Spanish"),
|
|
("fr", "French"),
|
|
("de", "German"),
|
|
("it", "Italian"),
|
|
("pt", "Portuguese"),
|
|
("ru", "Russian"),
|
|
("nl", "Dutch"),
|
|
("pl", "Polish")
|
|
]
|
|
|
|
def supports_batch_translation(self) -> bool:
|
|
"""DeepL supports native batch translation"""
|
|
return True
|
|
|
|
def batch_translate(self, texts: List[str], source_lang: str = "ja", target_lang: str = "en") -> List[str]:
|
|
"""Translate multiple texts using DeepL batch API"""
|
|
if not texts:
|
|
return []
|
|
|
|
# Ensure all texts are properly encoded
|
|
texts = [safe_decode_text(text) for text in texts]
|
|
logger.info(f"DeepL: Starting batch translation of {len(texts)} texts from {source_lang} to {target_lang}")
|
|
|
|
results = [None] * len(texts)
|
|
uncached_indices = []
|
|
uncached_texts = []
|
|
|
|
for i, text in enumerate(texts):
|
|
if not text or not text.strip():
|
|
results[i] = text
|
|
continue
|
|
|
|
cache_key = f"{source_lang}_{target_lang}_{text}"
|
|
if cache_key in self._cache:
|
|
results[i] = self._cache[cache_key]
|
|
continue
|
|
|
|
uncached_indices.append(i)
|
|
uncached_texts.append(text)
|
|
|
|
if not uncached_texts:
|
|
logger.info(f"DeepL: All {len(texts)} texts found in cache")
|
|
return results
|
|
|
|
logger.info(f"DeepL: Translating {len(uncached_texts)} uncached texts")
|
|
|
|
if not self.api_key:
|
|
logger.error("DeepL API key is required for batch translation")
|
|
for i, idx in enumerate(uncached_indices):
|
|
results[idx] = texts[idx]
|
|
return results
|
|
|
|
# DeepL language codes mapping
|
|
lang_map = {
|
|
"ja": "JA", "en": "EN", "ko": "KO", "zh": "ZH",
|
|
"es": "ES", "fr": "FR", "de": "DE", "it": "IT",
|
|
"pt": "PT", "ru": "RU", "nl": "NL", "pl": "PL"
|
|
}
|
|
source_lang_code = lang_map.get(source_lang, source_lang.upper())
|
|
target_lang_code = lang_map.get(target_lang, target_lang.upper())
|
|
|
|
# Batch size limit for DeepL
|
|
batch_size = 50
|
|
|
|
for batch_start in range(0, len(uncached_texts), batch_size):
|
|
batch_end = min(batch_start + batch_size, len(uncached_texts))
|
|
batch_texts = uncached_texts[batch_start:batch_end]
|
|
batch_indices = uncached_indices[batch_start:batch_end]
|
|
|
|
logger.debug(f"DeepL batch {batch_start//batch_size + 1}: Processing {len(batch_texts)} texts")
|
|
|
|
endpoint = f"{self._base_url}/v2/translate"
|
|
headers = {
|
|
"Authorization": f"DeepL-Auth-Key {self.api_key}",
|
|
"Content-Type": "application/x-www-form-urlencoded"
|
|
}
|
|
|
|
# Build form data with multiple text parameters (DeepL supports multiple 'text' params)
|
|
form_data = [
|
|
('source_lang', source_lang_code),
|
|
('target_lang', target_lang_code)
|
|
]
|
|
for text in batch_texts:
|
|
form_data.append(('text', text))
|
|
|
|
try:
|
|
logger.debug(f"Making batch request to DeepL API: {endpoint}")
|
|
|
|
import requests
|
|
response = requests.post(endpoint, headers=headers, data=form_data, timeout=30)
|
|
logger.debug(f"DeepL batch response status: {response.status_code}")
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
logger.debug(f"DeepL batch response: {result}")
|
|
|
|
if "translations" in result and len(result["translations"]) == len(batch_texts):
|
|
for i, translation_data in enumerate(result["translations"]):
|
|
original_text = batch_texts[i]
|
|
translated_text = translation_data["text"]
|
|
original_idx = batch_indices[i]
|
|
|
|
cache_key = f"{source_lang}_{target_lang}_{original_text}"
|
|
self._cache[cache_key] = translated_text
|
|
|
|
results[original_idx] = translated_text
|
|
logger.debug(f"DeepL batch SUCCESS: '{original_text}' -> '{translated_text}'")
|
|
else:
|
|
logger.error(f"DeepL batch API returned unexpected response: {result}")
|
|
for i, idx in enumerate(batch_indices):
|
|
results[idx] = batch_texts[i]
|
|
|
|
# Rate limiting between batches
|
|
if batch_end < len(uncached_texts):
|
|
time.sleep(1.0 / self._rate_limit_per_second)
|
|
|
|
except Exception as e:
|
|
logger.error(f"DeepL batch translation failed: {e}")
|
|
for i, idx in enumerate(batch_indices):
|
|
results[idx] = batch_texts[i]
|
|
|
|
# Ensure all results are filled
|
|
for i, result in enumerate(results):
|
|
if result is None:
|
|
results[i] = texts[i]
|
|
|
|
successful_translations = sum(1 for i, result in enumerate(results) if result != texts[i])
|
|
logger.info(f"DeepL batch translation complete: {successful_translations}/{len(texts)} successfully translated")
|
|
|
|
return results
|
|
|
|
|
|
class MyMemoryService(TranslationService):
|
|
"""MyMemory free translation service - no API key required"""
|
|
|
|
def __init__(self):
|
|
super().__init__("MyMemory (Free)")
|
|
self._rate_limit_per_second = 1 # Conservative rate limiting for free service
|
|
self._base_url = "https://api.mymemory.translated.net"
|
|
|
|
def translate_text(self, text: str, source_lang: str = "ja", target_lang: str = "en") -> str:
|
|
"""Translate text using MyMemory free API"""
|
|
# Ensure text is properly encoded
|
|
text = safe_decode_text(text)
|
|
logger.info(f"MyMemory: Starting translation of '{text}' from {source_lang} to {target_lang}")
|
|
|
|
if not text or not text.strip():
|
|
logger.debug("Empty text provided, returning as-is")
|
|
return text
|
|
|
|
# MyMemory uses different language codes
|
|
lang_map = {"ja": "ja", "en": "en", "ko": "ko", "zh": "zh", "es": "es", "fr": "fr", "de": "de"}
|
|
source_lang = lang_map.get(source_lang, source_lang)
|
|
target_lang = lang_map.get(target_lang, target_lang)
|
|
|
|
endpoint = f"{self._base_url}/get"
|
|
params = {
|
|
'q': text,
|
|
'langpair': f"{source_lang}|{target_lang}",
|
|
'de': 'neoneko@avatartoolkit.com' # Optional email for higher quotas
|
|
}
|
|
|
|
try:
|
|
logger.debug(f"Making request to MyMemory API: {endpoint} with params: {params}")
|
|
response = requests.get(endpoint, params=params, timeout=15) # Increased timeout
|
|
logger.debug(f"MyMemory response status: {response.status_code}")
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
logger.debug(f"MyMemory response: {result}")
|
|
|
|
if result.get('responseStatus') == 200 and 'responseData' in result:
|
|
translated_text = result['responseData']['translatedText']
|
|
matches = result.get('matches', [])
|
|
if matches and len(matches) > 0:
|
|
match_quality = matches[0].get('quality', '0')
|
|
logger.debug(f"MyMemory translation quality: {match_quality}")
|
|
|
|
logger.info(f"MyMemory SUCCESS: '{text}' -> '{translated_text}'")
|
|
return translated_text
|
|
else:
|
|
error_msg = result.get('responseDetails', 'Unknown error')
|
|
logger.error(f"MyMemory API error: {error_msg}")
|
|
|
|
if 'QUOTA_EXCEEDED' in error_msg:
|
|
raise TranslationError(f"MyMemory daily quota (1000 requests) exceeded. Try again tomorrow or switch to another service.")
|
|
else:
|
|
raise TranslationError(f"MyMemory API error: {error_msg}")
|
|
|
|
except requests.Timeout as e:
|
|
logger.error(f"MyMemory request timed out: {e}")
|
|
raise TranslationError(f"MyMemory request timed out after 15 seconds")
|
|
except requests.RequestException as e:
|
|
logger.error(f"MyMemory API request failed: {e}")
|
|
raise TranslationError(f"MyMemory API request failed: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in MyMemory: {e}")
|
|
raise TranslationError(f"Unexpected error: {e}")
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if MyMemory service is available"""
|
|
try:
|
|
response = requests.get(f"{self._base_url}/get",
|
|
params={'q': 'test', 'langpair': 'en|en'},
|
|
timeout=5)
|
|
return response.status_code == 200
|
|
except:
|
|
return False
|
|
|
|
def get_supported_languages(self) -> List[Tuple[str, str]]:
|
|
"""Get supported languages for MyMemory"""
|
|
return [
|
|
("ja", "Japanese"),
|
|
("en", "English"),
|
|
("ko", "Korean"),
|
|
("zh", "Chinese"),
|
|
("es", "Spanish"),
|
|
("fr", "French"),
|
|
("de", "German"),
|
|
("it", "Italian"),
|
|
("pt", "Portuguese"),
|
|
("ru", "Russian")
|
|
]
|
|
|
|
def supports_batch_translation(self) -> bool:
|
|
"""MyMemory optimized batch processing"""
|
|
return True
|
|
|
|
def batch_translate(self, texts: List[str], source_lang: str = "ja", target_lang: str = "en") -> List[str]:
|
|
"""Translate multiple texts using MyMemory with optimized batching and caching"""
|
|
if not texts:
|
|
return []
|
|
|
|
# Ensure all texts are properly encoded
|
|
texts = [safe_decode_text(text) for text in texts]
|
|
logger.info(f"MyMemory: Starting batch translation of {len(texts)} texts from {source_lang} to {target_lang}")
|
|
|
|
results = [None] * len(texts)
|
|
uncached_indices = []
|
|
uncached_texts = []
|
|
|
|
for i, text in enumerate(texts):
|
|
if not text or not text.strip():
|
|
results[i] = text
|
|
continue
|
|
|
|
cache_key = f"{source_lang}_{target_lang}_{text}"
|
|
if cache_key in self._cache:
|
|
results[i] = self._cache[cache_key]
|
|
continue
|
|
|
|
uncached_indices.append(i)
|
|
uncached_texts.append(text)
|
|
|
|
if not uncached_texts:
|
|
logger.info(f"MyMemory: All {len(texts)} texts found in cache")
|
|
return results
|
|
|
|
logger.info(f"MyMemory: Translating {len(uncached_texts)} uncached texts using concurrent processing")
|
|
|
|
# Use concurrent processing for MyMemory to speed up translations
|
|
import concurrent.futures
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import threading
|
|
|
|
def translate_single_text(text_info):
|
|
idx, text = text_info
|
|
try:
|
|
with self._rate_limit_lock:
|
|
current_time = time.time()
|
|
if current_time - self._last_request_time < (1.0 / self._rate_limit_per_second):
|
|
sleep_time = (1.0 / self._rate_limit_per_second) - (current_time - self._last_request_time)
|
|
time.sleep(sleep_time)
|
|
self._last_request_time = time.time()
|
|
|
|
translated = self.translate_text(text, source_lang, target_lang)
|
|
|
|
cache_key = f"{source_lang}_{target_lang}_{text}"
|
|
self._cache[cache_key] = translated
|
|
|
|
return idx, translated, None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"MyMemory concurrent translation failed for '{text}': {e}")
|
|
return idx, text, e
|
|
|
|
# Use conservative concurrent processing (2 workers max for free service)
|
|
max_workers = min(len(uncached_texts), 2)
|
|
batch_size = 8
|
|
|
|
for batch_start in range(0, len(uncached_texts), batch_size):
|
|
batch_end = min(batch_start + batch_size, len(uncached_texts))
|
|
batch_texts = uncached_texts[batch_start:batch_end]
|
|
batch_indices = uncached_indices[batch_start:batch_end]
|
|
|
|
text_info_batch = [(batch_indices[i], text) for i, text in enumerate(batch_texts)]
|
|
|
|
logger.debug(f"MyMemory concurrent batch {batch_start//batch_size + 1}: Processing {len(batch_texts)} texts with {max_workers} workers")
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
future_to_text = {executor.submit(translate_single_text, text_info): text_info for text_info in text_info_batch}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_text):
|
|
try:
|
|
original_idx, translated_text, error = future.result(timeout=25)
|
|
results[original_idx] = translated_text
|
|
|
|
if error is None:
|
|
logger.debug(f"MyMemory concurrent SUCCESS: -> '{translated_text}'")
|
|
else:
|
|
logger.debug(f"MyMemory concurrent FAILED: {error}")
|
|
|
|
except concurrent.futures.TimeoutError:
|
|
text_info = future_to_text[future]
|
|
original_idx, original_text = text_info
|
|
results[original_idx] = original_text
|
|
logger.warning(f"MyMemory concurrent timeout for text: '{original_text}'")
|
|
except Exception as e:
|
|
text_info = future_to_text[future]
|
|
original_idx, original_text = text_info
|
|
results[original_idx] = original_text
|
|
logger.error(f"MyMemory concurrent thread error for '{original_text}': {e}")
|
|
|
|
# Shorter pause between batches since we're not hammering the API
|
|
if batch_end < len(uncached_texts):
|
|
time.sleep(0.5)
|
|
|
|
for i, result in enumerate(results):
|
|
if result is None:
|
|
results[i] = texts[i]
|
|
|
|
successful_translations = sum(1 for i, result in enumerate(results) if result != texts[i])
|
|
logger.info(f"MyMemory concurrent batch translation complete: {successful_translations}/{len(texts)} successfully translated")
|
|
|
|
return results
|
|
|
|
|
|
class LibreTranslateService(TranslationService):
|
|
"""LibreTranslate translation service with configurable server"""
|
|
|
|
def __init__(self, api_url: str = "https://libretranslate.com", api_key: str = None):
|
|
super().__init__("LibreTranslate")
|
|
# Ensure URL has trailing slash like official implementation
|
|
self.api_url = api_url.rstrip('/') + '/'
|
|
self.api_key = api_key
|
|
self._rate_limit_per_second = 2 # Conservative rate limiting
|
|
self._is_paid_service = "libretranslate.com" in api_url.lower()
|
|
|
|
def translate_text(self, text: str, source_lang: str = "ja", target_lang: str = "en") -> str:
|
|
"""Translate text using LibreTranslate API"""
|
|
# Ensure text is properly encoded
|
|
text = safe_decode_text(text)
|
|
logger.info(f"LibreTranslate: Starting translation of '{text}' from {source_lang} to {target_lang}")
|
|
|
|
if not text or not text.strip():
|
|
logger.debug("Empty text provided, returning as-is")
|
|
return text
|
|
|
|
lang_map = {"ja": "ja", "en": "en", "ko": "ko", "zh": "zh", "es": "es", "fr": "fr", "de": "de", "it": "it", "pt": "pt", "ru": "ru"}
|
|
source_lang = lang_map.get(source_lang, source_lang)
|
|
target_lang = lang_map.get(target_lang, target_lang)
|
|
|
|
endpoint = f"{self.api_url}translate"
|
|
data = {
|
|
"q": text,
|
|
"source": source_lang,
|
|
"target": target_lang
|
|
}
|
|
# Add API key if available (required for libretranslate.com, optional for self-hosted)
|
|
if self.api_key:
|
|
data["api_key"] = self.api_key
|
|
|
|
headers = {
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
try:
|
|
logger.debug(f"Making request to LibreTranslate API: {endpoint}")
|
|
# Use JSON format like official API documentation
|
|
response = requests.post(endpoint, json=data, headers=headers, timeout=15)
|
|
logger.debug(f"LibreTranslate response status: {response.status_code}")
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
logger.debug(f"LibreTranslate response: {result}")
|
|
|
|
if "translatedText" in result:
|
|
translated_text = result["translatedText"]
|
|
logger.info(f"LibreTranslate SUCCESS: '{text}' -> '{translated_text}'")
|
|
return translated_text
|
|
else:
|
|
raise TranslationError("LibreTranslate API returned no translation")
|
|
|
|
except requests.HTTPError as e:
|
|
if e.response.status_code == 429:
|
|
raise TranslationError("LibreTranslate rate limit exceeded")
|
|
elif e.response.status_code == 400:
|
|
raise TranslationError("LibreTranslate: Invalid language pair or text")
|
|
else:
|
|
logger.error(f"LibreTranslate HTTP error: {e}")
|
|
raise TranslationError(f"LibreTranslate API error: {e}")
|
|
except requests.Timeout:
|
|
logger.error("LibreTranslate request timed out")
|
|
raise TranslationError("LibreTranslate request timed out after 15 seconds")
|
|
except requests.RequestException as e:
|
|
logger.error(f"LibreTranslate API request failed: {e}")
|
|
raise TranslationError(f"LibreTranslate API request failed: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in LibreTranslate: {e}")
|
|
raise TranslationError(f"Unexpected error: {e}")
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if LibreTranslate service is available"""
|
|
try:
|
|
endpoint = f"{self.api_url}languages"
|
|
|
|
params = {}
|
|
if self.api_key:
|
|
params["api_key"] = self.api_key
|
|
|
|
response = requests.get(endpoint, params=params if params else None, timeout=5)
|
|
return response.status_code == 200
|
|
except:
|
|
return False
|
|
|
|
def get_supported_languages(self) -> List[Tuple[str, str]]:
|
|
"""Get supported languages for LibreTranslate"""
|
|
try:
|
|
endpoint = f"{self.api_url}languages"
|
|
|
|
params = {}
|
|
if self.api_key:
|
|
params["api_key"] = self.api_key
|
|
|
|
response = requests.get(endpoint, params=params if params else None, timeout=5)
|
|
|
|
if response.status_code == 200:
|
|
languages = response.json()
|
|
return [(lang["code"], lang["name"]) for lang in languages]
|
|
except:
|
|
pass
|
|
|
|
# Fallback to common languages
|
|
return [
|
|
("ja", "Japanese"),
|
|
("en", "English"),
|
|
("ko", "Korean"),
|
|
("zh", "Chinese"),
|
|
("es", "Spanish"),
|
|
("fr", "French"),
|
|
("de", "German"),
|
|
("it", "Italian"),
|
|
("pt", "Portuguese"),
|
|
("ru", "Russian")
|
|
]
|
|
|
|
def supports_batch_translation(self) -> bool:
|
|
"""LibreTranslate optimized batch processing (concurrent requests)"""
|
|
return True
|
|
|
|
def batch_translate(self, texts: List[str], source_lang: str = "ja", target_lang: str = "en") -> List[str]:
|
|
"""Translate multiple texts using LibreTranslate with optimized concurrent requests"""
|
|
if not texts:
|
|
return []
|
|
|
|
# Ensure all texts are properly encoded
|
|
texts = [safe_decode_text(text) for text in texts]
|
|
logger.info(f"LibreTranslate: Starting batch translation of {len(texts)} texts from {source_lang} to {target_lang}")
|
|
|
|
# Check cache and separate cached vs uncached texts
|
|
results = [None] * len(texts)
|
|
uncached_indices = []
|
|
uncached_texts = []
|
|
|
|
for i, text in enumerate(texts):
|
|
if not text or not text.strip():
|
|
results[i] = text
|
|
continue
|
|
|
|
cache_key = f"{source_lang}_{target_lang}_{text}"
|
|
if cache_key in self._cache:
|
|
results[i] = self._cache[cache_key]
|
|
continue
|
|
|
|
uncached_indices.append(i)
|
|
uncached_texts.append(text)
|
|
|
|
if not uncached_texts:
|
|
logger.info(f"LibreTranslate: All {len(texts)} texts found in cache")
|
|
return results
|
|
|
|
logger.info(f"LibreTranslate: Translating {len(uncached_texts)} uncached texts")
|
|
|
|
# LibreTranslate language mapping
|
|
lang_map = {"ja": "ja", "en": "en", "ko": "ko", "zh": "zh", "es": "es", "fr": "fr", "de": "de", "it": "it", "pt": "pt", "ru": "ru"}
|
|
source_lang_code = lang_map.get(source_lang, source_lang)
|
|
target_lang_code = lang_map.get(target_lang, target_lang)
|
|
|
|
# Batch process in groups to avoid overwhelming the server
|
|
import concurrent.futures
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
def translate_single_text(text_info):
|
|
idx, text = text_info
|
|
try:
|
|
translated = self.translate_text(text, source_lang, target_lang)
|
|
cache_key = f"{source_lang}_{target_lang}_{text}"
|
|
self._cache[cache_key] = translated
|
|
return idx, translated, None
|
|
except Exception as e:
|
|
logger.warning(f"LibreTranslate translation failed for '{text}': {e}")
|
|
return idx, text, e
|
|
|
|
# Use thread pool for concurrent requests (limited to avoid server overload)
|
|
max_workers = min(len(uncached_texts), 3)
|
|
batch_size = 10 # Process in smaller batches
|
|
|
|
for batch_start in range(0, len(uncached_texts), batch_size):
|
|
batch_end = min(batch_start + batch_size, len(uncached_texts))
|
|
batch_texts = uncached_texts[batch_start:batch_end]
|
|
batch_indices = uncached_indices[batch_start:batch_end]
|
|
|
|
text_info_batch = [(batch_indices[i], text) for i, text in enumerate(batch_texts)]
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
future_to_text = {executor.submit(translate_single_text, text_info): text_info for text_info in text_info_batch}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_text):
|
|
try:
|
|
original_idx, translated_text, error = future.result(timeout=30)
|
|
results[original_idx] = translated_text
|
|
|
|
if error is None:
|
|
logger.debug(f"LibreTranslate SUCCESS: -> '{translated_text}'")
|
|
else:
|
|
logger.debug(f"LibreTranslate FAILED: {error}")
|
|
|
|
except concurrent.futures.TimeoutError:
|
|
text_info = future_to_text[future]
|
|
original_idx, original_text = text_info
|
|
results[original_idx] = original_text
|
|
logger.warning(f"LibreTranslate timeout for text: '{original_text}'")
|
|
except Exception as e:
|
|
text_info = future_to_text[future]
|
|
original_idx, original_text = text_info
|
|
results[original_idx] = original_text
|
|
logger.error(f"LibreTranslate thread error for '{original_text}': {e}")
|
|
|
|
if batch_end < len(uncached_texts):
|
|
time.sleep(0.5)
|
|
|
|
for i, result in enumerate(results):
|
|
if result is None:
|
|
results[i] = texts[i]
|
|
|
|
successful_translations = sum(1 for i, result in enumerate(results) if result != texts[i])
|
|
logger.info(f"LibreTranslate batch translation complete: {successful_translations}/{len(texts)} successfully translated")
|
|
|
|
return results
|
|
|
|
|
|
class TranslationServiceManager:
|
|
"""Manages multiple translation services with fallback logic"""
|
|
|
|
def __init__(self):
|
|
self._services: Dict[str, TranslationService] = {}
|
|
self._primary_service: Optional[str] = None
|
|
self._initialize_services()
|
|
|
|
def _initialize_services(self):
|
|
"""Initialize available translation services"""
|
|
mymemory = MyMemoryService()
|
|
self._services["mymemory"] = mymemory
|
|
|
|
libretranslate_url = get_preference("libretranslate_url", "https://libretranslate.com")
|
|
libretranslate_api_key = get_preference("libretranslate_api_key", "")
|
|
libretranslate = LibreTranslateService(api_url=libretranslate_url, api_key=libretranslate_api_key if libretranslate_api_key else None)
|
|
self._services["libretranslate"] = libretranslate
|
|
|
|
deepl_api_key = get_preference("deepl_api_key", "")
|
|
if deepl_api_key:
|
|
deepl = DeepLService(api_key=deepl_api_key, use_free_api=True)
|
|
self._services["deepl"] = deepl
|
|
|
|
# Set primary service from preferences (default to free service)
|
|
self._primary_service = get_preference("translation_service", "mymemory")
|
|
|
|
logger.info(f"Initialized translation services: {list(self._services.keys())}")
|
|
logger.info(f"Primary service: {self._primary_service}")
|
|
|
|
def get_available_services(self) -> List[Tuple[str, str]]:
|
|
"""Get list of available translation services"""
|
|
available = []
|
|
for service_id, service in self._services.items():
|
|
if service.is_available():
|
|
available.append((service_id, service.name))
|
|
else:
|
|
logger.debug(f"Service {service.name} is not available")
|
|
return available
|
|
|
|
def set_primary_service(self, service_id: str) -> bool:
|
|
"""Set the primary translation service"""
|
|
if service_id in self._services:
|
|
self._primary_service = service_id
|
|
save_preference("translation_service", service_id)
|
|
logger.info(f"Set primary translation service to: {service_id}")
|
|
return True
|
|
return False
|
|
|
|
def get_service(self, service_id: Optional[str] = None) -> Optional[TranslationService]:
|
|
"""Get a translation service by ID"""
|
|
if service_id is None:
|
|
service_id = self._primary_service
|
|
|
|
if service_id and service_id in self._services:
|
|
service = self._services[service_id]
|
|
if service.is_available():
|
|
return service
|
|
|
|
return None
|
|
|
|
def translate_with_fallback(self, text: str, source_lang: str = "ja", target_lang: str = "en") -> Tuple[str, str]:
|
|
"""Translate text with automatic fallback to other services"""
|
|
# Ensure text is properly encoded
|
|
text = safe_decode_text(text)
|
|
if not text or not text.strip():
|
|
return text, "none"
|
|
|
|
# Try primary service first
|
|
primary_service = self.get_service()
|
|
if primary_service:
|
|
try:
|
|
result = primary_service.translate_text(text, source_lang, target_lang)
|
|
return result, primary_service.name
|
|
except Exception as e:
|
|
logger.warning(f"Primary service {primary_service.name} failed: {e}")
|
|
|
|
for service_id, service in self._services.items():
|
|
if service_id == self._primary_service:
|
|
continue
|
|
|
|
if service.is_available():
|
|
try:
|
|
result = service.translate_text(text, source_lang, target_lang)
|
|
logger.info(f"Fallback to {service.name} successful")
|
|
return result, service.name
|
|
except Exception as e:
|
|
logger.warning(f"Fallback service {service.name} failed: {e}")
|
|
|
|
logger.error(f"All translation services failed for: {text}")
|
|
return text, "failed"
|
|
|
|
def batch_translate_with_fallback(self, texts: List[str], source_lang: str = "ja", target_lang: str = "en") -> List[Tuple[str, str]]:
|
|
"""Batch translate with fallback - uses optimized batch processing when available"""
|
|
if not texts:
|
|
return []
|
|
|
|
logger.info(f"Starting batch translation of {len(texts)} texts using service manager")
|
|
|
|
primary_service = self.get_service()
|
|
if primary_service:
|
|
try:
|
|
if primary_service.supports_batch_translation():
|
|
logger.info(f"Using native batch translation with {primary_service.name}")
|
|
translations = primary_service.batch_translate(texts, source_lang, target_lang)
|
|
return [(translation, primary_service.name) for translation in translations]
|
|
else:
|
|
logger.info(f"Service {primary_service.name} does not support batch translation, using individual requests")
|
|
# Use the base implementation for services without batch support
|
|
translations = []
|
|
for text in texts:
|
|
translated = primary_service.translate_text(text, source_lang, target_lang)
|
|
translations.append(translated)
|
|
return [(translation, primary_service.name) for translation in translations]
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Batch translation failed with {primary_service.name}: {e}")
|
|
|
|
results = []
|
|
for text in texts:
|
|
translation, service_name = self.translate_with_fallback(text, source_lang, target_lang)
|
|
results.append((translation, service_name))
|
|
|
|
return results
|
|
|
|
|
|
# Global translation service manager instance
|
|
_translation_manager: Optional[TranslationServiceManager] = None
|
|
|
|
|
|
def get_translation_manager() -> TranslationServiceManager:
|
|
"""Get the global translation service manager"""
|
|
global _translation_manager
|
|
if _translation_manager is None:
|
|
_translation_manager = TranslationServiceManager()
|
|
return _translation_manager
|
|
|
|
|
|
def configure_deepl_translator(api_key: str, use_free_api: bool = True) -> bool:
|
|
"""Configure DeepL translation service"""
|
|
try:
|
|
save_preference("deepl_api_key", api_key)
|
|
save_preference("deepl_use_free_api", use_free_api)
|
|
|
|
# Test the API key
|
|
deepl = DeepLService(api_key=api_key, use_free_api=use_free_api)
|
|
if deepl.is_available():
|
|
# Re-initialize the global manager to pick up new service
|
|
global _translation_manager
|
|
_translation_manager = None
|
|
logger.info("DeepL translator configured successfully")
|
|
return True
|
|
else:
|
|
logger.error("DeepL API key test failed")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to configure DeepL translator: {e}")
|
|
return False
|
|
|
|
|
|
def configure_libretranslate_server(server_url: str, api_key: str = None) -> bool:
|
|
"""Configure LibreTranslate server URL and optional API key"""
|
|
try:
|
|
if not server_url.strip():
|
|
server_url = "https://libretranslate.com"
|
|
|
|
# Ensure proper URL format
|
|
if not server_url.startswith(('http://', 'https://')):
|
|
server_url = 'https://' + server_url
|
|
|
|
save_preference("libretranslate_url", server_url)
|
|
save_preference("libretranslate_api_key", api_key if api_key else "")
|
|
|
|
# Test the server
|
|
libretranslate = LibreTranslateService(api_url=server_url, api_key=api_key)
|
|
if libretranslate.is_available():
|
|
# Re-initialize the global manager to pick up new service
|
|
global _translation_manager
|
|
_translation_manager = None
|
|
logger.info(f"LibreTranslate server configured successfully: {server_url}")
|
|
return True
|
|
else:
|
|
logger.error(f"LibreTranslate server test failed: {server_url}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to configure LibreTranslate server: {e}")
|
|
return False
|
|
|
|
|
|
|
|
|