from typing import Dict, Optional from datetime import datetime from app.config import settings from app.core.redis import get_redis import httpx import json import logging logger = logging.getLogger(__name__) FALLBACK_RATES: Dict[str, Dict[str, float]] = { "USD": {"CNY": 7.24, "EUR": 0.92, "GBP": 0.79, "JPY": 151.50, "KRW": 1320.00, "AUD": 1.52, "CAD": 1.37, "INR": 83.50, "BRL": 5.10, "RUB": 92.00}, "CNY": {"USD": 0.138, "EUR": 0.127, "GBP": 0.109, "JPY": 20.93, "KRW": 182.32, "AUD": 0.21, "CAD": 0.19}, "EUR": {"USD": 1.09, "CNY": 7.85, "GBP": 0.86, "JPY": 164.50, "KRW": 1435.00}, "GBP": {"USD": 1.27, "CNY": 9.15, "EUR": 1.16, "JPY": 192.00}, } CACHE_TTL = 21600 class ExchangeRateService: def __init__(self): self._rates_cache: Optional[Dict] = None self._cache_time: Optional[datetime] = None async def get_rate(self, from_currency: str, to_currency: str) -> Optional[float]: from_currency = from_currency.upper() to_currency = to_currency.upper() if from_currency == to_currency: return 1.0 rates = await self._get_all_rates(from_currency) if rates and to_currency in rates: return rates[to_currency] base_rates = FALLBACK_RATES.get(from_currency, {}) return base_rates.get(to_currency) async def convert(self, from_currency: str, to_currency: str, amount: float = 1.0) -> Optional[float]: rate = await self.get_rate(from_currency, to_currency) if rate is None: return None return round(amount * rate, 2) async def get_all_rates(self, base: str = "USD") -> Dict[str, float]: base = base.upper() rates = await self._get_all_rates(base) if rates: return rates return FALLBACK_RATES.get(base, {}) async def _get_all_rates(self, base: str) -> Optional[Dict[str, float]]: cached = await self._get_from_cache(base) if cached: return cached rates = None for fetcher in [self._fetch_from_frankfurter, self._fetch_from_exchangerate_api]: try: rates = await fetcher(base) if rates: break except Exception as e: logger.warning(f"Exchange rate fetcher failed: {e}") if rates: await self._set_cache(base, rates) return rates async def _fetch_from_frankfurter(self, base: str) -> Optional[Dict[str, float]]: supported = ["USD", "EUR", "GBP", "CNY", "JPY", "KRW", "AUD", "CAD", "INR", "BRL"] if base not in supported: return None try: async with httpx.AsyncClient() as client: resp = await client.get( f"https://api.frankfurter.app/latest", params={"from": base, "to": ",".join(supported)}, timeout=10, ) if resp.status_code == 200: data = resp.json() return data.get("rates") except Exception as e: logger.warning(f"Frankfurter API failed: {e}") return None async def _fetch_from_exchangerate_api(self, base: str) -> Optional[Dict[str, float]]: if not settings.EXCHANGE_RATE_API_KEY: return None try: async with httpx.AsyncClient() as client: resp = await client.get( f"https://v6.exchangerate-api.com/v6/{settings.EXCHANGE_RATE_API_KEY}/latest/{base}", timeout=10, ) if resp.status_code == 200: data = resp.json() if data.get("result") == "success": return data.get("conversion_rates") except Exception as e: logger.warning(f"ExchangeRate-API failed: {e}") return None async def _get_from_cache(self, base: str) -> Optional[Dict[str, float]]: try: r = await get_redis() data = await r.get(f"exchange_rate:{base}") if data: return json.loads(data) except Exception as e: logger.debug(f"Redis cache miss for {base}: {e}") return None async def _set_cache(self, base: str, rates: Dict[str, float]): try: r = await get_redis() await r.setex(f"exchange_rate:{base}", CACHE_TTL, json.dumps(rates)) except Exception as e: logger.debug(f"Redis cache set failed for {base}: {e}")