import logging from typing import List, Dict, Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.models.search_provider import SearchProvider logger = logging.getLogger(__name__) IGNORE_DOMAINS = [ "google.com", "facebook.com", "twitter.com", "instagram.com", "youtube.com", "reddit.com", "amazon.com", "ebay.com", "wikipedia.org", "linkedin.com", "pinterest.com", "baidu.com", "bing.com", ] class SearchService: def __init__(self, db: AsyncSession): self.db = db async def search(self, query: str, limit: int = 10) -> List[Dict[str, str]]: providers = await self._get_enabled_providers() for provider in providers: try: return await self._search_provider(provider, query, limit) except Exception as e: logger.warning(f"Search provider {provider.provider_type} failed: {e}") return [] async def _get_enabled_providers(self) -> List[SearchProvider]: result = await self.db.execute( select(SearchProvider) .where(SearchProvider.enabled == True) .order_by(SearchProvider.priority) ) return list(result.scalars().all()) async def _search_provider(self, provider: SearchProvider, query: str, limit: int) -> List[Dict[str, str]]: pt = provider.provider_type if pt == "searxng": return await searxng_search(provider.api_endpoint, query, limit) elif pt == "bing": return await bing_search(provider.api_key, query, limit) else: raise ValueError(f"Unknown provider type: {pt}") async def searxng_search(endpoint: Optional[str], query: str, limit: int) -> List[Dict[str, str]]: if not endpoint: raise ValueError("SearXNG endpoint not configured") import httpx async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get( endpoint.rstrip("/") + "/search", params={"q": query, "format": "json", "language": "zh-CN,en", "categories": "general"}, headers={"User-Agent": "TradeMate/1.0"}, ) if resp.status_code != 200: raise ValueError(f"SearXNG returned {resp.status_code}") data = resp.json() results = [] for item in (data.get("results", []) if isinstance(data, dict) else data): url = item.get("url", "") if any(d in url for d in IGNORE_DOMAINS): continue results.append({ "title": (item.get("title") or url)[:100], "url": url.rstrip("/"), "snippet": (item.get("content") or item.get("snippet") or "")[:200], }) if len(results) >= limit: break return results async def bing_search(api_key: Optional[str], query: str, limit: int) -> List[Dict[str, str]]: if not api_key: raise ValueError("Bing API key not configured") import httpx async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get( "https://api.bing.microsoft.com/v7.0/search", params={"q": query, "count": min(limit, 50), "mkt": "en-US", "textFormat": "Raw"}, headers={"Ocp-Apim-Subscription-Key": api_key}, ) if resp.status_code != 200: raise ValueError(f"Bing returned {resp.status_code}") data = resp.json() results = [] for item in data.get("webPages", {}).get("value", []): url = item.get("url", "") if any(d in url for d in IGNORE_DOMAINS): continue results.append({ "title": (item.get("name") or url)[:100], "url": url.rstrip("/"), "snippet": (item.get("snippet") or "")[:200], }) if len(results) >= limit: break return results