Files
trade-assistant/backend/app/services/search_web.py
T
TradeMate Dev 13e3992d4c fix: security and code quality improvements
Security fixes:
- Add file upload size limits (10MB) for customer and product imports
- Add XLSX file validation with row limits and magic byte checking
- Implement password validation (min 6 chars) in registration
- Add rate limiting for guest login (5 per IP per 15 minutes)
- Sanitize error messages to prevent information leakage
- Fix XSS vulnerability by removing unsafe v-html usage
- Enforce WhatsApp webhook signature verification
- Add SSRF protection with URL validation and IP blocking
- Fix marketing endpoints to use proper authentication

Code quality improvements:
- Create shared utility functions for UUID validation and string sanitization
- Remove duplicate UUID validation code from admin modules
- Remove dead code (pass statement in translation.py)
- Fix aliyun SDK import compatibility
2026-06-11 17:54:07 +08:00

99 lines
3.6 KiB
Python

from typing import List, Dict, Optional
import httpx
import json
import logging
from app.config import settings
logger = logging.getLogger(__name__)
GOOGLE_CSE_URL = "https://www.googleapis.com/customsearch/v1"
IGNORE_DOMAINS = [
"google.com", "facebook.com", "twitter.com", "instagram.com",
"youtube.com", "reddit.com", "amazon.com", "ebay.com",
"wikipedia.org", "linkedin.com", "pinterest.com", "baidu.com",
"bing.com", "duckduckgo.com",
]
async def search_companies(query: str, max_results: int = 10) -> List[Dict[str, str]]:
api_key = settings.GOOGLE_API_KEY or ""
cse_id = settings.GOOGLE_CSE_ID or ""
if api_key and cse_id:
return await _google_cse(query, max_results, api_key, cse_id)
logger.info("Google CSE not configured, using template results")
return []
async def _google_cse(query: str, max_results: int, api_key: str, cse_id: str) -> List[Dict[str, str]]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(GOOGLE_CSE_URL, params={
"key": api_key,
"cx": cse_id,
"q": query,
"num": min(max_results, 10),
"lr": "lang_en",
})
if resp.status_code != 200:
logger.warning(f"Google CSE returned {resp.status_code}")
return []
data = resp.json()
results = []
for item in data.get("items", []):
url = item.get("link", "")
if not url or any(d in url for d in IGNORE_DOMAINS):
continue
results.append({
"title": item.get("title", url)[:100],
"url": url.rstrip("/"),
"snippet": item.get("snippet", "")[:200],
})
return results[:max_results]
except Exception as e:
logger.warning(f"Google CSE failed: {e}")
return []
async def fetch_page_text(url: str) -> Optional[str]:
# Validate URL to prevent SSRF
from urllib.parse import urlparse
import ipaddress
try:
parsed = urlparse(url)
if parsed.scheme not in ('http', 'https'):
logger.warning(f"Invalid URL scheme: {url}")
return None
# Check if hostname is an IP address and block private/reserved ranges
hostname = parsed.hostname
if hostname:
try:
ip = ipaddress.ip_address(hostname)
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
logger.warning(f"Blocked private/reserved IP: {url}")
return None
except ValueError:
# Not an IP address, it's a hostname - proceed normally
pass
except Exception as e:
logger.warning(f"URL validation failed for {url}: {e}")
return None
try:
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
resp = await client.get(url, headers={"User-Agent": "Mozilla/5.0"})
if resp.status_code == 200:
from bs4 import BeautifulSoup
soup = BeautifulSoup(resp.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator=" ", strip=True)
import re
text = re.sub(r"\s+", " ", text)[:3000]
return text if len(text) > 100 else None
except Exception as e:
logger.debug(f"fetch {url} failed: {e}")
return None