Files
trade-assistant/backend/app/services/discovery.py
T
TradeMate Dev c1638db6b2 Add discovery search history with auto-save, fix timeout causing search failure
- Save every search result to DB for later review
- Add '搜索历史' tab with timeline view, load/delete records
- Raise discovery search timeout from 30s to 120s (Bing Puppeteer needs ~40s)
- Reduce search queries from 4 to 3 for faster response
- New model: DiscoveryRecord (user_id, product, market, companies JSON)
- API: POST/GET/DELETE /api/v1/discovery/records
- Migration: discovery_records table
2026-05-27 15:54:50 +08:00

312 lines
16 KiB
Python

import json
import logging
from typing import Dict, Any, Optional, Union
from app.ai.router import get_ai_router
from app.services.search_web import search_companies, fetch_page_text
from app.services.mcp_search_server import search_bing_batch
logger = logging.getLogger(__name__)
ANALYZE_MATCH_PROMPT = """你是外贸客户分析专家。分析目标公司的业务描述,判断其与用户产品的匹配度。
请以 JSON 格式返回(不要用 markdown 代码块标记):
{
"match_score": 0-100,
"match_reason": "为什么匹配/不匹配",
"company_summary": "这家公司的主要业务",
"product_fit": "产品匹配度说明",
"contact_info": {
"emails": ["找到的邮箱"],
"phones": ["找到的电话"],
"social": ["LinkedIn等社媒链接"],
"wechat": "找到的微信号",
"whatsapp": "找到的 WhatsApp 号码"
}
}
只返回 JSON,不要其他内容。"""
class DiscoveryService:
def __init__(self):
ai_router = get_ai_router()
self.ai = ai_router
self._ai_available = len(ai_router.providers) > 0
async def search(self, product_description: str, target_market: str) -> Dict[str, Any]:
queries = self._build_queries(product_description, target_market)
all_results = await self._web_search_all(queries)
companies = []
provider = "template"
if all_results:
raw = all_results.get("results", [])
companies = [self._to_company(r) for r in raw[:12]]
provider = all_results.get("provider", "web_search")
good_enough = [c for c in companies if self._looks_like_business(c)]
if len(good_enough) < 3:
logger.info(f"Web search returned only {len(good_enough)} good results, supplementing with suggestions")
extras = self._suggest_companies(product_description, target_market)
seen_names = set(c.get("name", "") for c in good_enough)
for c in extras:
if c.get("name") and c["name"] not in seen_names:
seen_names.add(c["name"])
good_enough.append(c)
return {
"companies": good_enough[:15],
"query": product_description,
"market": target_market,
"provider": provider,
}
async def analyze(self, company_url: str, product_description: str) -> Dict[str, Any]:
page_text = await fetch_page_text(company_url)
company_info = {"url": company_url}
if page_text:
company_info["page_text"] = page_text[:2500]
if not self._ai_available:
return self._template_analysis(company_url)
prompt = f"""用户的产品:{product_description}
目标公司信息:
URL: {company_url}
网页内容:{page_text[:2500] if page_text else "无法获取网页内容"}
请分析该公司的业务与用户产品的匹配度。"""
try:
result = await self.ai.chat(prompt, system_prompt=ANALYZE_MATCH_PROMPT)
content = result.get("reply", "")
parsed = self._extract_json(content)
if parsed:
parsed["url"] = company_url
parsed["provider"] = result.get("provider_used", "unknown")
return parsed
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"Analysis AI parse failed: {e}")
return self._template_analysis(company_url)
async def outreach(self, company_info: Dict[str, Any], product_info: Dict[str, Any]) -> Dict[str, Any]:
if not self._ai_available:
return self._template_outreach(company_info, product_info)
prompt = f"""目标公司信息:
{json.dumps(company_info, ensure_ascii=False)}
我的产品信息:
{json.dumps(product_info, ensure_ascii=False)}
请生成个性化触达文案。"""
system = """你是外贸开发信专家。根据目标公司信息和你的产品,生成个性化触达文案。
请以 JSON 格式返回(不要用 markdown 代码块标记):
{
"subject": "邮件标题(如适用)",
"linkedin_message": "LinkedIn 私信文案(150字以内)",
"whatsapp_message": "WhatsApp 消息文案(100字以内)",
"email_body": "邮件正文(含开头问候、自我介绍、价值主张、行动号召、签名)",
"key_points": ["客户关注的3个要点"],
"tips": ["发送时的建议"]
}"""
try:
result = await self.ai.chat(prompt, system_prompt=system)
content = result.get("reply", "")
parsed = self._extract_json(content)
if parsed:
parsed["provider"] = result.get("provider_used", "unknown")
return parsed
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"Outreach AI parse failed: {e}")
return self._template_outreach(company_info, product_info)
async def _web_search_all(self, queries: list) -> dict:
try:
results = await search_bing_batch(queries[:3], max_per_query=4)
if results:
return {"results": self._dedup_and_filter(results)[:15], "provider": "bing"}
except Exception as e:
logger.warning(f"Bing batch search failed: {e}")
results = await search_companies(queries[0], max_results=10)
if results:
return {"results": results[:15], "provider": "google_cse"}
return {}
def _dedup_and_filter(self, results: list) -> list:
seen = set()
filtered = []
junk = ["sciencedirect", "mdpi", "springer", "wiley", "acm.org",
"ieee.org", "researchgate", "nature.com", "oup.com",
"sagepub", "tandfonline", "ncbi", "semanticscholar",
"britannica", "dictionary", "cambridge", "iciba", "wikipedia",
"w3.org", "whatsapp.com", "wechat.com", "qq.com",
"zhihu.com", "sogou.com", "163.com", "sohu.com", "sina.com",
"taobao.com", "tmall.com", "alipay.com", "alibaba.com",
"csdn.net", "blog.csdn", "jianshu.com", "36kr.com",
"huxiu.com", "geekpark.net", "leiphone.com",
"medium.com", "wordpress.com", "blogspot.com",
"youtube.com", "facebook.com", "twitter.com", "instagram.com",
"reddit.com", "quora.com"]
for r in results:
url = r.get("url", "").rstrip("/")
if not url or url in seen:
continue
seen.add(url)
s = url.split("/")[2] if "://" in url else url
hostname = s.split(":")[0].lower() if ":" in s else s.lower()
if any(tld in hostname for tld in [".edu", ".ac.", ".gov", ".edu.cn"]):
continue
if any(domain in hostname for domain in junk):
continue
filtered.append(r)
return filtered
def _to_company(self, r: dict) -> dict:
url = r.get("url", "")
title = r.get("title", url)[:60]
snippet = r.get("snippet", "")[:200]
return {
"name": title,
"description": snippet,
"country": "",
"match_score": 60,
"contact": url[:100] if url else "暂无",
"source": "web",
}
def _looks_like_business(self, c: dict) -> bool:
name = c.get("name", "")
snippet = c.get("description", "")
junk_words = ["news", "review", "blog", "dictionary", "translate",
"wikipedia", "百科", "词典", "新闻", "评测",
"price", "shop", "buy online", "forum", "subscribe",
"专业媒体", "行业媒体", "新媒体", "门户"]
name_lower = name.lower()
if any(w in name_lower for w in junk_words):
return False
snippet_lower = snippet.lower()
newsy = ["news", "review", "blog post", "article", "dictionary",
"专业媒体", "行业媒体", "新媒体"]
if any(w in snippet_lower for w in newsy):
biz_words = ["company", "inc", "ltd", "corp", "gmbh", "llc",
"manufacturer", "supplier", "exporter",
"wholesale", "distributor", "trading",
"import", "enterprise", "co.", "factory",
"industry", "electric", "solar", "energy",
"automotive", "vehicle", "官网", "有限公司",
"集团", "股份", "实业"]
if not any(w in snippet_lower for w in biz_words):
return False
return True
def _build_queries(self, product: str, market: str) -> list:
import re
has_cjk = bool(re.search(r'[\u4e00-\u9fff]', product))
queries = [
f"{product} importer {market}",
f"{product} distributor {market}",
f"{product} wholesale buyer {market}",
f"{product} procurement {market}",
f"{product} trading company {market}",
f"buy {product} from {market}",
f"{product} supply chain {market}",
f"top {product} manufacturers {market}",
]
if has_cjk:
queries += [
f"{product} {market} 进口商",
f"{product} {market} 经销商",
f"{product} {market} 采购",
f"{product} {market} 批发",
]
b2b_queries = [
f"{product} buyer {market} alibaba",
f"{product} supplier {market}",
f"{product} wholesale price {market}",
]
return queries + b2b_queries
def _extract_json(self, text: str) -> Optional[Union[dict, list]]:
text = text.strip()
for prefix in ["```json", "```", "```JSON"]:
if text.startswith(prefix):
text = text[len(prefix):]
for suffix in ["```"]:
if text.endswith(suffix):
text = text[:-len(suffix)]
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
import re
brace = text.find("{")
end = text.rfind("}")
if brace >= 0 and end > brace:
try:
return json.loads(text[brace:end+1])
except json.JSONDecodeError:
pass
return None
def _suggest_companies(self, product: str, market: str) -> list:
return [
{"name": f"{product} Importers in {market}", "description": f"{market} 从事 {product} 进口和批发的贸易商和专业进口商", "country": market, "match_score": 80, "contact": f"在 LinkedIn 搜索 '{product} importer {market}'", "source": "建议"},
{"name": f"{product} Distributors in {market}", "description": f"{market} 分销 {product} 的分销渠道商和批发商", "country": market, "match_score": 75, "contact": f"在 Google/Bing 搜索 '{product} distributor {market}'", "source": "建议"},
{"name": f"{market} Trade Association", "description": f"联系 {market} 的相关行业协会获取会员企业名录", "country": market, "match_score": 70, "contact": f"搜索 '{market} {product} association'", "source": "建议"},
{"name": f"Alibaba {market} Buyers", "description": f"在 Alibaba.com 搜索 '{product}' 并筛选 {market} 买家", "country": market, "match_score": 75, "contact": "https://www.alibaba.com", "source": "建议"},
{"name": f"LinkedIn {market} Decision Makers", "description": f"在 LinkedIn 搜索 '{market} {product} procurement/sourcing manager' 找决策人", "country": market, "match_score": 65, "contact": "LinkedIn Premium", "source": "建议"},
{"name": f"{market} Import-Export Records", "description": f"在 importgenius.com 搜索 {product}{market} 的进口记录,找到真实买家", "country": market, "match_score": 70, "contact": "https://www.importgenius.com", "source": "建议"},
]
def _template_strategy(self, product: str, market: str) -> Dict[str, Any]:
search_terms = [
f"{product} importer {market}",
f"{product} distributor {market}",
f"{product} wholesale {market}",
f"{product} buyers {market}",
]
b2b_sites = ["Alibaba.com", "TradeIndia.com", "GlobalSources.com", "Made-in-China.com"]
return {
"companies": [
{"name": f"{product} Importers in {market}", "description": f"使用Google/Bing搜索 '{product} importer {market}' 可找到正在采购该产品的进口商", "country": market, "match_score": 80, "contact": "通过搜索结果获取", "source": "搜索建议"},
{"name": f"{product} Distributors in {market}", "description": f"使用Google/Bing搜索 '{product} distributor {market}' 可找到分销渠道商", "country": market, "match_score": 75, "contact": "通过搜索结果获取", "source": "搜索建议"},
{"name": f"{product} Wholesale Buyers in {market}", "description": f"使用Google/Bing搜索 '{product} wholesale {market}' 可找到批发采购商", "country": market, "match_score": 70, "contact": "通过搜索结果获取", "source": "搜索建议"},
{"name": f"{market} {product} Trade Partners", "description": f"在B2B平台({', '.join(b2b_sites[:3])})搜索 {market} 买家发布的采购需求", "country": market, "match_score": 65, "contact": "B2B平台站内信", "source": "B2B平台"},
],
"strategy": f"推荐搜索计划:\n1. 搜索 '{' '.join(search_terms[:2])}' 直接找客户\n2. 在 LinkedIn 搜索 '{market} {product} manager' 找决策人\n3. 在 {b2b_sites[0]}{b2b_sites[1]} 查找 {market} 买家询盘\n4. 参加 {market} 相关行业展会获取名录",
"tips": [
f"把搜索词 '{product} importer {market}' 改成当地语言效果更好",
"找到公司后访问官网,在 About/Team 页面找决策人LinkedIn",
"用 SimilarWeb 查看目标公司网站流量和来源",
"在行业协会网站查找会员名录"],
"provider": "template",
"ai_generated": True,
}
def _template_analysis(self, url: str) -> Dict[str, Any]:
return {
"match_score": 50,
"match_reason": "无法获取网页内容进行分析,建议手动查看",
"url": url,
"provider": "template",
"contact_info": {"emails": [], "phones": [], "social": [], "wechat": "", "whatsapp": ""},
}
def _template_outreach(self, company: Dict[str, Any], product: Dict[str, Any]) -> Dict[str, Any]:
company_name = company.get("name", "")
product_name = product.get("name", "")
return {
"subject": f"关于{product_name}的合作机会",
"linkedin_message": f"您好!了解到贵司{company_name}在经营相关业务,我们专业生产{product_name},品质稳定,价格有竞争力。如有兴趣,我可以发详细资料供参考。",
"whatsapp_message": f"Hello! We are a professional {product_name} manufacturer. Interested in exploring cooperation? Happy to share details.",
"email_body": f"Dear {company_name} team,\n\nWe are a professional {product_name} manufacturer with competitive pricing and consistent quality. Would you be open to a quick chat to explore potential cooperation?\n\nBest regards,\n[Your Name]",
"key_points": ["产品质量有保障", "价格有竞争力", "可定制"],
"tips": ["发送前先了解对方背景", "LinkedIn 消息要简短"],
"provider": "template",
}