13e3992d4c
Security fixes: - Add file upload size limits (10MB) for customer and product imports - Add XLSX file validation with row limits and magic byte checking - Implement password validation (min 6 chars) in registration - Add rate limiting for guest login (5 per IP per 15 minutes) - Sanitize error messages to prevent information leakage - Fix XSS vulnerability by removing unsafe v-html usage - Enforce WhatsApp webhook signature verification - Add SSRF protection with URL validation and IP blocking - Fix marketing endpoints to use proper authentication Code quality improvements: - Create shared utility functions for UUID validation and string sanitization - Remove duplicate UUID validation code from admin modules - Remove dead code (pass statement in translation.py) - Fix aliyun SDK import compatibility
99 lines
3.6 KiB
Python
99 lines
3.6 KiB
Python
from typing import List, Dict, Optional
|
|
import httpx
|
|
import json
|
|
import logging
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
GOOGLE_CSE_URL = "https://www.googleapis.com/customsearch/v1"
|
|
|
|
IGNORE_DOMAINS = [
|
|
"google.com", "facebook.com", "twitter.com", "instagram.com",
|
|
"youtube.com", "reddit.com", "amazon.com", "ebay.com",
|
|
"wikipedia.org", "linkedin.com", "pinterest.com", "baidu.com",
|
|
"bing.com", "duckduckgo.com",
|
|
]
|
|
|
|
|
|
async def search_companies(query: str, max_results: int = 10) -> List[Dict[str, str]]:
|
|
api_key = settings.GOOGLE_API_KEY or ""
|
|
cse_id = settings.GOOGLE_CSE_ID or ""
|
|
if api_key and cse_id:
|
|
return await _google_cse(query, max_results, api_key, cse_id)
|
|
logger.info("Google CSE not configured, using template results")
|
|
return []
|
|
|
|
|
|
async def _google_cse(query: str, max_results: int, api_key: str, cse_id: str) -> List[Dict[str, str]]:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
resp = await client.get(GOOGLE_CSE_URL, params={
|
|
"key": api_key,
|
|
"cx": cse_id,
|
|
"q": query,
|
|
"num": min(max_results, 10),
|
|
"lr": "lang_en",
|
|
})
|
|
if resp.status_code != 200:
|
|
logger.warning(f"Google CSE returned {resp.status_code}")
|
|
return []
|
|
data = resp.json()
|
|
results = []
|
|
for item in data.get("items", []):
|
|
url = item.get("link", "")
|
|
if not url or any(d in url for d in IGNORE_DOMAINS):
|
|
continue
|
|
results.append({
|
|
"title": item.get("title", url)[:100],
|
|
"url": url.rstrip("/"),
|
|
"snippet": item.get("snippet", "")[:200],
|
|
})
|
|
return results[:max_results]
|
|
except Exception as e:
|
|
logger.warning(f"Google CSE failed: {e}")
|
|
return []
|
|
|
|
|
|
async def fetch_page_text(url: str) -> Optional[str]:
|
|
# Validate URL to prevent SSRF
|
|
from urllib.parse import urlparse
|
|
import ipaddress
|
|
|
|
try:
|
|
parsed = urlparse(url)
|
|
if parsed.scheme not in ('http', 'https'):
|
|
logger.warning(f"Invalid URL scheme: {url}")
|
|
return None
|
|
|
|
# Check if hostname is an IP address and block private/reserved ranges
|
|
hostname = parsed.hostname
|
|
if hostname:
|
|
try:
|
|
ip = ipaddress.ip_address(hostname)
|
|
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
|
|
logger.warning(f"Blocked private/reserved IP: {url}")
|
|
return None
|
|
except ValueError:
|
|
# Not an IP address, it's a hostname - proceed normally
|
|
pass
|
|
except Exception as e:
|
|
logger.warning(f"URL validation failed for {url}: {e}")
|
|
return None
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
|
|
resp = await client.get(url, headers={"User-Agent": "Mozilla/5.0"})
|
|
if resp.status_code == 200:
|
|
from bs4 import BeautifulSoup
|
|
soup = BeautifulSoup(resp.text, "html.parser")
|
|
for tag in soup(["script", "style", "nav", "footer", "header"]):
|
|
tag.decompose()
|
|
text = soup.get_text(separator=" ", strip=True)
|
|
import re
|
|
text = re.sub(r"\s+", " ", text)[:3000]
|
|
return text if len(text) > 100 else None
|
|
except Exception as e:
|
|
logger.debug(f"fetch {url} failed: {e}")
|
|
return None
|