import json import logging from typing import Dict, Any, Optional, Union from app.ai.router import get_ai_router from app.services.search_web import search_companies, fetch_page_text from app.services.mcp_search_server import search_bing_batch logger = logging.getLogger(__name__) ANALYZE_MATCH_PROMPT = """你是外贸客户分析专家。分析目标公司的业务描述,判断其与用户产品的匹配度。 请以 JSON 格式返回(不要用 markdown 代码块标记): { "match_score": 0-100, "match_reason": "为什么匹配/不匹配", "company_summary": "这家公司的主要业务", "product_fit": "产品匹配度说明", "contact_info": { "emails": ["找到的邮箱"], "phones": ["找到的电话"], "social": ["LinkedIn等社媒链接"], "wechat": "找到的微信号", "whatsapp": "找到的 WhatsApp 号码" } } 只返回 JSON,不要其他内容。""" class DiscoveryService: def __init__(self): ai_router = get_ai_router() self.ai = ai_router self._ai_available = len(ai_router.providers) > 0 async def search(self, product_description: str, target_market: str) -> Dict[str, Any]: queries = self._build_queries(product_description, target_market) all_results = await self._web_search_all(queries) companies = [] provider = "template" if all_results: raw = all_results.get("results", []) companies = [self._to_company(r) for r in raw[:12]] provider = all_results.get("provider", "web_search") good_enough = [c for c in companies if self._looks_like_business(c)] if len(good_enough) < 3: logger.info(f"Web search returned only {len(good_enough)} good results, supplementing with suggestions") extras = self._suggest_companies(product_description, target_market) seen_names = set(c.get("name", "") for c in good_enough) for c in extras: if c.get("name") and c["name"] not in seen_names: seen_names.add(c["name"]) good_enough.append(c) return { "companies": good_enough[:15], "query": product_description, "market": target_market, "provider": provider, } async def analyze(self, company_url: str, product_description: str) -> Dict[str, Any]: page_text = await fetch_page_text(company_url) company_info = {"url": company_url} if page_text: company_info["page_text"] = page_text[:2500] if not self._ai_available: return self._template_analysis(company_url) prompt = f"""用户的产品:{product_description} 目标公司信息: URL: {company_url} 网页内容:{page_text[:2500] if page_text else "无法获取网页内容"} 请分析该公司的业务与用户产品的匹配度。""" try: result = await self.ai.chat(prompt, system_prompt=ANALYZE_MATCH_PROMPT) content = result.get("reply", "") parsed = self._extract_json(content) if parsed: parsed["url"] = company_url parsed["provider"] = result.get("provider_used", "unknown") return parsed except (json.JSONDecodeError, Exception) as e: logger.warning(f"Analysis AI parse failed: {e}") return self._template_analysis(company_url) async def outreach(self, company_info: Dict[str, Any], product_info: Dict[str, Any]) -> Dict[str, Any]: if not self._ai_available: return self._template_outreach(company_info, product_info) prompt = f"""目标公司信息: {json.dumps(company_info, ensure_ascii=False)} 我的产品信息: {json.dumps(product_info, ensure_ascii=False)} 请生成个性化触达文案。""" system = """你是外贸开发信专家。根据目标公司信息和你的产品,生成个性化触达文案。 请以 JSON 格式返回(不要用 markdown 代码块标记): { "subject": "邮件标题(如适用)", "linkedin_message": "LinkedIn 私信文案(150字以内)", "whatsapp_message": "WhatsApp 消息文案(100字以内)", "email_body": "邮件正文(含开头问候、自我介绍、价值主张、行动号召、签名)", "key_points": ["客户关注的3个要点"], "tips": ["发送时的建议"] }""" try: result = await self.ai.chat(prompt, system_prompt=system) content = result.get("reply", "") parsed = self._extract_json(content) if parsed: parsed["provider"] = result.get("provider_used", "unknown") return parsed except (json.JSONDecodeError, Exception) as e: logger.warning(f"Outreach AI parse failed: {e}") return self._template_outreach(company_info, product_info) async def _web_search_all(self, queries: list) -> dict: try: results = await search_bing_batch(queries[:3], max_per_query=4) if results: return {"results": self._dedup_and_filter(results)[:15], "provider": "bing"} except Exception as e: logger.warning(f"Bing batch search failed: {e}") results = await search_companies(queries[0], max_results=10) if results: return {"results": results[:15], "provider": "google_cse"} return {} def _dedup_and_filter(self, results: list) -> list: seen = set() filtered = [] junk = ["sciencedirect", "mdpi", "springer", "wiley", "acm.org", "ieee.org", "researchgate", "nature.com", "oup.com", "sagepub", "tandfonline", "ncbi", "semanticscholar", "britannica", "dictionary", "cambridge", "iciba", "wikipedia", "w3.org", "whatsapp.com", "wechat.com", "qq.com", "zhihu.com", "sogou.com", "163.com", "sohu.com", "sina.com", "taobao.com", "tmall.com", "alipay.com", "alibaba.com", "csdn.net", "blog.csdn", "jianshu.com", "36kr.com", "huxiu.com", "geekpark.net", "leiphone.com", "medium.com", "wordpress.com", "blogspot.com", "youtube.com", "facebook.com", "twitter.com", "instagram.com", "reddit.com", "quora.com"] for r in results: url = r.get("url", "").rstrip("/") if not url or url in seen: continue seen.add(url) s = url.split("/")[2] if "://" in url else url hostname = s.split(":")[0].lower() if ":" in s else s.lower() if any(tld in hostname for tld in [".edu", ".ac.", ".gov", ".edu.cn"]): continue if any(domain in hostname for domain in junk): continue filtered.append(r) return filtered def _to_company(self, r: dict) -> dict: url = r.get("url", "") title = r.get("title", url)[:60] snippet = r.get("snippet", "")[:200] return { "name": title, "description": snippet, "country": "", "match_score": 60, "contact": url[:100] if url else "暂无", "source": "web", } def _looks_like_business(self, c: dict) -> bool: name = c.get("name", "") snippet = c.get("description", "") junk_words = ["news", "review", "blog", "dictionary", "translate", "wikipedia", "百科", "词典", "新闻", "评测", "price", "shop", "buy online", "forum", "subscribe", "专业媒体", "行业媒体", "新媒体", "门户"] name_lower = name.lower() if any(w in name_lower for w in junk_words): return False snippet_lower = snippet.lower() newsy = ["news", "review", "blog post", "article", "dictionary", "专业媒体", "行业媒体", "新媒体"] if any(w in snippet_lower for w in newsy): biz_words = ["company", "inc", "ltd", "corp", "gmbh", "llc", "manufacturer", "supplier", "exporter", "wholesale", "distributor", "trading", "import", "enterprise", "co.", "factory", "industry", "electric", "solar", "energy", "automotive", "vehicle", "官网", "有限公司", "集团", "股份", "实业"] if not any(w in snippet_lower for w in biz_words): return False return True def _build_queries(self, product: str, market: str) -> list: import re has_cjk = bool(re.search(r'[\u4e00-\u9fff]', product)) queries = [ f"{product} importer {market}", f"{product} distributor {market}", f"{product} wholesale buyer {market}", f"{product} procurement {market}", f"{product} trading company {market}", f"buy {product} from {market}", f"{product} supply chain {market}", f"top {product} manufacturers {market}", ] if has_cjk: queries += [ f"{product} {market} 进口商", f"{product} {market} 经销商", f"{product} {market} 采购", f"{product} {market} 批发", ] b2b_queries = [ f"{product} buyer {market} alibaba", f"{product} supplier {market}", f"{product} wholesale price {market}", ] return queries + b2b_queries def _extract_json(self, text: str) -> Optional[Union[dict, list]]: text = text.strip() for prefix in ["```json", "```", "```JSON"]: if text.startswith(prefix): text = text[len(prefix):] for suffix in ["```"]: if text.endswith(suffix): text = text[:-len(suffix)] text = text.strip() try: return json.loads(text) except json.JSONDecodeError: import re brace = text.find("{") end = text.rfind("}") if brace >= 0 and end > brace: try: return json.loads(text[brace:end+1]) except json.JSONDecodeError: pass return None def _suggest_companies(self, product: str, market: str) -> list: return [ {"name": f"{product} Importers in {market}", "description": f"在 {market} 从事 {product} 进口和批发的贸易商和专业进口商", "country": market, "match_score": 80, "contact": f"在 LinkedIn 搜索 '{product} importer {market}'", "source": "建议"}, {"name": f"{product} Distributors in {market}", "description": f"在 {market} 分销 {product} 的分销渠道商和批发商", "country": market, "match_score": 75, "contact": f"在 Google/Bing 搜索 '{product} distributor {market}'", "source": "建议"}, {"name": f"{market} Trade Association", "description": f"联系 {market} 的相关行业协会获取会员企业名录", "country": market, "match_score": 70, "contact": f"搜索 '{market} {product} association'", "source": "建议"}, {"name": f"Alibaba {market} Buyers", "description": f"在 Alibaba.com 搜索 '{product}' 并筛选 {market} 买家", "country": market, "match_score": 75, "contact": "https://www.alibaba.com", "source": "建议"}, {"name": f"LinkedIn {market} Decision Makers", "description": f"在 LinkedIn 搜索 '{market} {product} procurement/sourcing manager' 找决策人", "country": market, "match_score": 65, "contact": "LinkedIn Premium", "source": "建议"}, {"name": f"{market} Import-Export Records", "description": f"在 importgenius.com 搜索 {product} 在 {market} 的进口记录,找到真实买家", "country": market, "match_score": 70, "contact": "https://www.importgenius.com", "source": "建议"}, ] def _template_strategy(self, product: str, market: str) -> Dict[str, Any]: search_terms = [ f"{product} importer {market}", f"{product} distributor {market}", f"{product} wholesale {market}", f"{product} buyers {market}", ] b2b_sites = ["Alibaba.com", "TradeIndia.com", "GlobalSources.com", "Made-in-China.com"] return { "companies": [ {"name": f"{product} Importers in {market}", "description": f"使用Google/Bing搜索 '{product} importer {market}' 可找到正在采购该产品的进口商", "country": market, "match_score": 80, "contact": "通过搜索结果获取", "source": "搜索建议"}, {"name": f"{product} Distributors in {market}", "description": f"使用Google/Bing搜索 '{product} distributor {market}' 可找到分销渠道商", "country": market, "match_score": 75, "contact": "通过搜索结果获取", "source": "搜索建议"}, {"name": f"{product} Wholesale Buyers in {market}", "description": f"使用Google/Bing搜索 '{product} wholesale {market}' 可找到批发采购商", "country": market, "match_score": 70, "contact": "通过搜索结果获取", "source": "搜索建议"}, {"name": f"{market} {product} Trade Partners", "description": f"在B2B平台({', '.join(b2b_sites[:3])})搜索 {market} 买家发布的采购需求", "country": market, "match_score": 65, "contact": "B2B平台站内信", "source": "B2B平台"}, ], "strategy": f"推荐搜索计划:\n1. 搜索 '{' '.join(search_terms[:2])}' 直接找客户\n2. 在 LinkedIn 搜索 '{market} {product} manager' 找决策人\n3. 在 {b2b_sites[0]} 和 {b2b_sites[1]} 查找 {market} 买家询盘\n4. 参加 {market} 相关行业展会获取名录", "tips": [ f"把搜索词 '{product} importer {market}' 改成当地语言效果更好", "找到公司后访问官网,在 About/Team 页面找决策人LinkedIn", "用 SimilarWeb 查看目标公司网站流量和来源", "在行业协会网站查找会员名录"], "provider": "template", "ai_generated": True, } def _template_analysis(self, url: str) -> Dict[str, Any]: return { "match_score": 50, "match_reason": "无法获取网页内容进行分析,建议手动查看", "url": url, "provider": "template", "contact_info": {"emails": [], "phones": [], "social": [], "wechat": "", "whatsapp": ""}, } def _template_outreach(self, company: Dict[str, Any], product: Dict[str, Any]) -> Dict[str, Any]: company_name = company.get("name", "") product_name = product.get("name", "") return { "subject": f"关于{product_name}的合作机会", "linkedin_message": f"您好!了解到贵司{company_name}在经营相关业务,我们专业生产{product_name},品质稳定,价格有竞争力。如有兴趣,我可以发详细资料供参考。", "whatsapp_message": f"Hello! We are a professional {product_name} manufacturer. Interested in exploring cooperation? Happy to share details.", "email_body": f"Dear {company_name} team,\n\nWe are a professional {product_name} manufacturer with competitive pricing and consistent quality. Would you be open to a quick chat to explore potential cooperation?\n\nBest regards,\n[Your Name]", "key_points": ["产品质量有保障", "价格有竞争力", "可定制"], "tips": ["发送前先了解对方背景", "LinkedIn 消息要简短"], "provider": "template", }