Files
trade-assistant/backend/app/services/discovery.py
T
TradeMate Dev bed5c7abef Add landing page, referral system, usage quotas, search API management, and yearly pricing
- Separate workspace landing from login for better UX
- Referral system rewards both parties with Pro days
- Quota enforcement prevents abuse without breaking endpoints
- 7-day free trial with auto-downgrade on expiry
- Admin-managed search provider config (SearXNG, Bing)
- 15% discount on annual subscriptions
- MCP search server wrapping opencode search
- Fix discovery module field name mismatch causing 422
2026-05-26 11:40:13 +08:00

285 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import logging
from typing import Dict, Any, Optional
from app.ai.router import get_ai_router
from app.services.search_web import search_companies, fetch_page_text
from app.services.mcp_search_client import mcp_search
logger = logging.getLogger(__name__)
ANALYZE_MATCH_PROMPT = """你是外贸客户分析专家。分析目标公司的业务描述,判断其与用户产品的匹配度。
请以 JSON 格式返回(不要用 markdown 代码块标记):
{
"match_score": 0-100,
"match_reason": "为什么匹配/不匹配",
"company_summary": "这家公司的主要业务",
"product_fit": "产品匹配度说明",
"contact_info": {
"emails": ["找到的邮箱"],
"phones": ["找到的电话"],
"social": ["LinkedIn等社媒链接"]
}
}
只返回 JSON,不要其他内容。"""
class DiscoveryService:
def __init__(self):
ai_router = get_ai_router()
self.ai = ai_router
self._ai_available = len(ai_router.providers) > 0
async def search(self, product_description: str, target_market: str) -> Dict[str, Any]:
queries = self._build_queries(product_description, target_market)
all_results = await self._mcp_search_all(queries)
if all_results:
return {
"companies": all_results[:15],
"query": product_description,
"market": target_market,
"provider": "mcp_search",
}
all_results = await self._google_search_all(queries)
if all_results:
return {
"companies": all_results[:15],
"query": product_description,
"market": target_market,
"provider": "web_search",
}
logger.info("No real search results, using AI strategy")
return await self._ai_strategy(product_description, target_market)
async def analyze(self, company_url: str, product_description: str) -> Dict[str, Any]:
page_text = await fetch_page_text(company_url)
company_info = {"url": company_url}
if page_text:
company_info["page_text"] = page_text[:2500]
if not self._ai_available:
return self._template_analysis(company_url)
prompt = f"""用户的产品:{product_description}
目标公司信息:
URL: {company_url}
网页内容:{page_text[:2500] if page_text else "无法获取网页内容"}
请分析该公司的业务与用户产品的匹配度。"""
try:
result = await self.ai.chat(prompt, system_prompt=ANALYZE_MATCH_PROMPT)
content = result.get("reply", "")
parsed = self._extract_json(content)
if parsed:
parsed["url"] = company_url
parsed["provider"] = result.get("provider_used", "unknown")
return parsed
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"Analysis AI parse failed: {e}")
return self._template_analysis(company_url)
async def outreach(self, company_info: Dict[str, Any], product_info: Dict[str, Any]) -> Dict[str, Any]:
if not self._ai_available:
return self._template_outreach(company_info, product_info)
prompt = f"""目标公司信息:
{json.dumps(company_info, ensure_ascii=False)}
我的产品信息:
{json.dumps(product_info, ensure_ascii=False)}
请生成个性化触达文案。"""
system = """你是外贸开发信专家。根据目标公司信息和你的产品,生成个性化触达文案。
请以 JSON 格式返回(不要用 markdown 代码块标记):
{
"subject": "邮件标题(如适用)",
"linkedin_message": "LinkedIn 私信文案(150字以内)",
"whatsapp_message": "WhatsApp 消息文案(100字以内)",
"email_body": "邮件正文(含开头问候、自我介绍、价值主张、行动号召、签名)",
"key_points": ["客户关注的3个要点"],
"tips": ["发送时的建议"]
}"""
try:
result = await self.ai.chat(prompt, system_prompt=system)
content = result.get("reply", "")
parsed = self._extract_json(content)
if parsed:
parsed["provider"] = result.get("provider_used", "unknown")
return parsed
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"Outreach AI parse failed: {e}")
return self._template_outreach(company_info, product_info)
async def _mcp_search_all(self, queries: list) -> list:
seen_urls = set()
tasks = [asyncio.create_task(mcp_search(q, max_results=6)) for q in queries[:2]]
all_results = []
try:
for coro in asyncio.as_completed(tasks, timeout=8):
try:
results = await coro
for r in results:
url = r.get("url", "").rstrip("/")
if url and url not in seen_urls:
seen_urls.add(url)
all_results.append(r)
except (asyncio.TimeoutError, Exception) as e:
logger.debug(f"MCP search query failed: {e}")
except asyncio.TimeoutError:
logger.warning("MCP search overall timeout")
finally:
for t in tasks:
if not t.done():
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
if all_results:
return self._dedup_and_filter(all_results)[:15]
return []
def _dedup_and_filter(self, results: list) -> list:
seen = set()
filtered = []
for r in results:
url = r.get("url", "").rstrip("/")
title = r.get("title", "")
if not url or url in seen:
continue
seen.add(url)
s = url.split("/")[2] if "://" in url else url
hostname = s.split(":")[0].lower() if ":" in s else s.lower()
if any(tld in hostname for tld in [".cn", ".com.cn", ".edu", ".ac.", ".gov"]):
continue
if any(domain in hostname for domain in
["sciencedirect", "mdpi", "springer", "wiley", "acm.org",
"ieee.org", "researchgate", "nature.com", "oup.com",
"sagepub", "tandfonline", "ncbi", "semanticscholar",
"britannica", "dictionary", "cambridge", "iciba", "wikipedia"]):
continue
filtered.append(r)
return filtered
async def _google_search_all(self, queries: list) -> list:
all_results = []
seen_urls = set()
for q in queries[:3]:
results = await search_companies(q, max_results=8)
for r in results:
url = r["url"].rstrip("/")
if url not in seen_urls:
seen_urls.add(url)
all_results.append(r)
if len(all_results) >= 15:
break
return self._dedup_and_filter(all_results)[:15]
def _build_queries(self, product: str, market: str) -> list:
return [
f"{product} importer {market}",
f"{product} distributor {market}",
f"{product} wholesale buyer {market}",
f"{product} procurement {market}",
f"{product} company {market}",
f"buy {product} from {market}",
f"{product} supply chain {market}",
f"top {product} manufacturers {market}",
f"{product} import export {market}",
f"{product} trading company {market}",
]
def _extract_json(self, text: str) -> Optional[dict]:
text = text.strip()
for prefix in ["```json", "```", "```JSON"]:
if text.startswith(prefix):
text = text[len(prefix):]
for suffix in ["```"]:
if text.endswith(suffix):
text = text[:-len(suffix)]
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
import re
brace = text.find("{")
end = text.rfind("}")
if brace >= 0 and end > brace:
try:
return json.loads(text[brace:end+1])
except json.JSONDecodeError:
pass
return None
async def _ai_strategy(self, product: str, market: str) -> Dict[str, Any]:
if not self._ai_available:
return self._template_strategy(product, market)
system = """你是外贸客户发现专家。根据用户的产品和目标市场,列出15家有可能采购该产品的潜在公司。
请以 JSON 格式返回(不要用 markdown 代码块标记):
{
"companies": [
{"name": "公司名称", "description": "公司业务简介", "country": "所在国家", "match_score": 匹配度0-100, "contact": "联系方式(有就写,没有写'需进一步查找'", "source": "推荐来源说明"}
],
"strategy": "整体获取策略建议",
"tips": ["搜索建议1", "搜索建议2"]
}
要求:
- 公司名称要真实感,不要编造知名大公司
- 公司业务要与产品相关
- 匹配度要有区分度,60-95之间
- 至少返回10家
- 只返回 JSON,不要其他内容"""
prompt = f"产品:{product}\n目标市场:{market}\n请列出在该市场可能采购该产品的公司。"
try:
result = await self.ai.chat(prompt, system_prompt=system)
content = result.get("reply", "")
parsed = self._extract_json(content)
if parsed and "companies" in parsed:
parsed["provider"] = result.get("provider_used", "unknown")
parsed["ai_generated"] = True
return parsed
return self._template_strategy(product, market)
except Exception as e:
logger.warning(f"AI strategy failed: {e}")
return self._template_strategy(product, market)
def _template_strategy(self, product: str, market: str) -> Dict[str, Any]:
return {
"companies": [
{"name": f"{product} Importers in {market} (示例)", "description": f"{market}从事{product}进口和批发的贸易商,建议在LinkedIn上搜索相关关键词", "country": market, "match_score": 75, "contact": "需进一步查找", "source": "AI推荐"},
{"name": f"{product} Distributors in {market} (示例)", "description": f"{market}分销{product}的渠道商,建议通过Google搜索关键词", "country": market, "match_score": 70, "contact": "需进一步查找", "source": "AI推荐"},
],
"strategy": f"建议在 LinkedIn 和 Google 搜索 {market}{product} 相关公司,使用导入商、批发商、经销商等关键词组合",
"tips": ["使用多个搜索词组合", "找到公司后在 LinkedIn 找决策人", "查看公司网站了解其业务范围"],
"provider": "template",
"ai_generated": True,
}
def _template_analysis(self, url: str) -> Dict[str, Any]:
return {
"match_score": 50,
"match_reason": "无法获取网页内容进行分析,建议手动查看",
"url": url,
"provider": "template",
}
def _template_outreach(self, company: Dict[str, Any], product: Dict[str, Any]) -> Dict[str, Any]:
company_name = company.get("name", "")
product_name = product.get("name", "")
return {
"subject": f"关于{product_name}的合作机会",
"linkedin_message": f"您好!了解到贵司{company_name}在经营相关业务,我们专业生产{product_name},品质稳定,价格有竞争力。如有兴趣,我可以发详细资料供参考。",
"whatsapp_message": f"Hello! We are a professional {product_name} manufacturer. Interested in exploring cooperation? Happy to share details.",
"email_body": f"Dear {company_name} team,\n\nWe are a professional {product_name} manufacturer with competitive pricing and consistent quality. Would you be open to a quick chat to explore potential cooperation?\n\nBest regards,\n[Your Name]",
"key_points": ["产品质量有保障", "价格有竞争力", "可定制"],
"tips": ["发送前先了解对方背景", "LinkedIn 消息要简短"],
"provider": "template",
}