Add admin-frontend and user-frontend standalone projects, certification/invoice/discovery features, fix auth header and theme consistency
This commit is contained in:
@@ -0,0 +1,272 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
from app.ai.router import get_ai_router
|
||||
from app.services.search_web import search_companies, fetch_page_text
|
||||
from app.services.mcp_search_client import mcp_search
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ANALYZE_MATCH_PROMPT = """你是外贸客户分析专家。分析目标公司的业务描述,判断其与用户产品的匹配度。
|
||||
|
||||
请以 JSON 格式返回(不要用 markdown 代码块标记):
|
||||
{
|
||||
"match_score": 0-100,
|
||||
"match_reason": "为什么匹配/不匹配",
|
||||
"company_summary": "这家公司的主要业务",
|
||||
"product_fit": "产品匹配度说明",
|
||||
"contact_info": {
|
||||
"emails": ["找到的邮箱"],
|
||||
"phones": ["找到的电话"],
|
||||
"social": ["LinkedIn等社媒链接"]
|
||||
}
|
||||
}
|
||||
|
||||
只返回 JSON,不要其他内容。"""
|
||||
|
||||
|
||||
class DiscoveryService:
|
||||
def __init__(self):
|
||||
ai_router = get_ai_router()
|
||||
self.ai = ai_router
|
||||
self._ai_available = len(ai_router.providers) > 0
|
||||
|
||||
async def search(self, product_description: str, target_market: str) -> Dict[str, Any]:
|
||||
queries = self._build_queries(product_description, target_market)
|
||||
all_results = await self._mcp_search_all(queries)
|
||||
if all_results:
|
||||
return {
|
||||
"companies": all_results[:15],
|
||||
"query": product_description,
|
||||
"market": target_market,
|
||||
"provider": "mcp_search",
|
||||
}
|
||||
|
||||
all_results = await self._google_search_all(queries)
|
||||
if all_results:
|
||||
return {
|
||||
"companies": all_results[:15],
|
||||
"query": product_description,
|
||||
"market": target_market,
|
||||
"provider": "web_search",
|
||||
}
|
||||
|
||||
logger.info("No real search results, using AI strategy")
|
||||
return await self._ai_strategy(product_description, target_market)
|
||||
|
||||
async def analyze(self, company_url: str, product_description: str) -> Dict[str, Any]:
|
||||
page_text = await fetch_page_text(company_url)
|
||||
company_info = {"url": company_url}
|
||||
if page_text:
|
||||
company_info["page_text"] = page_text[:2500]
|
||||
|
||||
if not self._ai_available:
|
||||
return self._template_analysis(company_url)
|
||||
|
||||
prompt = f"""用户的产品:{product_description}
|
||||
|
||||
目标公司信息:
|
||||
URL: {company_url}
|
||||
网页内容:{page_text[:2500] if page_text else "无法获取网页内容"}
|
||||
|
||||
请分析该公司的业务与用户产品的匹配度。"""
|
||||
try:
|
||||
result = await self.ai.chat(prompt, system_prompt=ANALYZE_MATCH_PROMPT)
|
||||
content = result.get("reply", "")
|
||||
parsed = self._extract_json(content)
|
||||
if parsed:
|
||||
parsed["url"] = company_url
|
||||
parsed["provider"] = result.get("provider_used", "unknown")
|
||||
return parsed
|
||||
except (json.JSONDecodeError, Exception) as e:
|
||||
logger.warning(f"Analysis AI parse failed: {e}")
|
||||
return self._template_analysis(company_url)
|
||||
|
||||
async def outreach(self, company_info: Dict[str, Any], product_info: Dict[str, Any]) -> Dict[str, Any]:
|
||||
if not self._ai_available:
|
||||
return self._template_outreach(company_info, product_info)
|
||||
|
||||
prompt = f"""目标公司信息:
|
||||
{json.dumps(company_info, ensure_ascii=False)}
|
||||
|
||||
我的产品信息:
|
||||
{json.dumps(product_info, ensure_ascii=False)}
|
||||
|
||||
请生成个性化触达文案。"""
|
||||
system = """你是外贸开发信专家。根据目标公司信息和你的产品,生成个性化触达文案。
|
||||
|
||||
请以 JSON 格式返回(不要用 markdown 代码块标记):
|
||||
{
|
||||
"subject": "邮件标题(如适用)",
|
||||
"linkedin_message": "LinkedIn 私信文案(150字以内)",
|
||||
"whatsapp_message": "WhatsApp 消息文案(100字以内)",
|
||||
"email_body": "邮件正文(含开头问候、自我介绍、价值主张、行动号召、签名)",
|
||||
"key_points": ["客户关注的3个要点"],
|
||||
"tips": ["发送时的建议"]
|
||||
}"""
|
||||
try:
|
||||
result = await self.ai.chat(prompt, system_prompt=system)
|
||||
content = result.get("reply", "")
|
||||
parsed = self._extract_json(content)
|
||||
if parsed:
|
||||
parsed["provider"] = result.get("provider_used", "unknown")
|
||||
return parsed
|
||||
except (json.JSONDecodeError, Exception) as e:
|
||||
logger.warning(f"Outreach AI parse failed: {e}")
|
||||
return self._template_outreach(company_info, product_info)
|
||||
|
||||
async def _mcp_search_all(self, queries: list) -> list:
|
||||
seen_urls = set()
|
||||
tasks = [asyncio.create_task(mcp_search(q, max_results=6)) for q in queries[:2]]
|
||||
all_results = []
|
||||
try:
|
||||
for coro in asyncio.as_completed(tasks, timeout=8):
|
||||
try:
|
||||
results = await coro
|
||||
for r in results:
|
||||
url = r.get("url", "").rstrip("/")
|
||||
if url and url not in seen_urls:
|
||||
seen_urls.add(url)
|
||||
all_results.append(r)
|
||||
except (asyncio.TimeoutError, Exception) as e:
|
||||
logger.debug(f"MCP search query failed: {e}")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("MCP search overall timeout")
|
||||
finally:
|
||||
for t in tasks:
|
||||
if not t.done():
|
||||
t.cancel()
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
if all_results:
|
||||
return self._dedup_and_filter(all_results)[:15]
|
||||
return []
|
||||
|
||||
def _dedup_and_filter(self, results: list) -> list:
|
||||
seen = set()
|
||||
filtered = []
|
||||
for r in results:
|
||||
url = r.get("url", "").rstrip("/")
|
||||
title = r.get("title", "")
|
||||
if not url or url in seen:
|
||||
continue
|
||||
seen.add(url)
|
||||
s = url.split("/")[2] if "://" in url else url
|
||||
hostname = s.split(":")[0].lower() if ":" in s else s.lower()
|
||||
if any(tld in hostname for tld in [".cn", ".com.cn", ".edu", ".ac.", ".gov"]):
|
||||
continue
|
||||
if any(domain in hostname for domain in
|
||||
["sciencedirect", "mdpi", "springer", "wiley", "acm.org",
|
||||
"ieee.org", "researchgate", "nature.com", "oup.com",
|
||||
"sagepub", "tandfonline", "ncbi", "semanticscholar",
|
||||
"britannica", "dictionary", "cambridge", "iciba", "wikipedia"]):
|
||||
continue
|
||||
filtered.append(r)
|
||||
return filtered
|
||||
|
||||
async def _google_search_all(self, queries: list) -> list:
|
||||
all_results = []
|
||||
seen_urls = set()
|
||||
for q in queries[:3]:
|
||||
results = await search_companies(q, max_results=8)
|
||||
for r in results:
|
||||
url = r["url"].rstrip("/")
|
||||
if url not in seen_urls:
|
||||
seen_urls.add(url)
|
||||
all_results.append(r)
|
||||
if len(all_results) >= 15:
|
||||
break
|
||||
return self._dedup_and_filter(all_results)[:15]
|
||||
|
||||
def _build_queries(self, product: str, market: str) -> list:
|
||||
return [
|
||||
f"{product} importer {market}",
|
||||
f"{product} distributor {market}",
|
||||
f"{product} wholesale buyer {market}",
|
||||
f"{product} procurement {market}",
|
||||
f"{product} company {market}",
|
||||
f"buy {product} from {market}",
|
||||
f"{product} supply chain {market}",
|
||||
f"top {product} manufacturers {market}",
|
||||
f"{product} import export {market}",
|
||||
f"{product} trading company {market}",
|
||||
]
|
||||
|
||||
def _extract_json(self, text: str) -> Optional[dict]:
|
||||
text = text.strip()
|
||||
for prefix in ["```json", "```", "```JSON"]:
|
||||
if text.startswith(prefix):
|
||||
text = text[len(prefix):]
|
||||
for suffix in ["```"]:
|
||||
if text.endswith(suffix):
|
||||
text = text[:-len(suffix)]
|
||||
text = text.strip()
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
import re
|
||||
brace = text.find("{")
|
||||
end = text.rfind("}")
|
||||
if brace >= 0 and end > brace:
|
||||
try:
|
||||
return json.loads(text[brace:end+1])
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return None
|
||||
|
||||
async def _ai_strategy(self, product: str, market: str) -> Dict[str, Any]:
|
||||
if not self._ai_available:
|
||||
return self._template_strategy(product, market)
|
||||
system = """你是外贸客户发现专家。根据用户的产品和目标市场,分析出潜在买家画像和获取策略。
|
||||
|
||||
请以 JSON 格式返回(不要用 markdown 代码块标记):
|
||||
{
|
||||
"buyer_personas": [{"type": "", "description": "", "channels": [], "search_queries": []}],
|
||||
"strategy": "",
|
||||
"tips": []
|
||||
}"""
|
||||
prompt = f"产品:{product}\n目标市场:{market}\n请分析潜在买家画像和获取策略。"
|
||||
try:
|
||||
result = await self.ai.chat(prompt, system_prompt=system)
|
||||
content = result.get("reply", "")
|
||||
parsed = self._extract_json(content)
|
||||
if parsed:
|
||||
parsed["provider"] = result.get("provider_used", "unknown")
|
||||
return parsed
|
||||
return self._template_strategy(product, market)
|
||||
except Exception as e:
|
||||
logger.warning(f"AI strategy failed: {e}")
|
||||
return self._template_strategy(product, market)
|
||||
|
||||
def _template_strategy(self, product: str, market: str) -> Dict[str, Any]:
|
||||
return {
|
||||
"buyer_personas": [
|
||||
{"type": "进口商/批发商", "description": f"从中国进口{product}并在{market}批发的贸易商", "channels": ["LinkedIn", "Google"], "search_queries": [f"{product} importer {market}"]},
|
||||
{"type": "品牌商/OEM买家", "description": f"在{market}销售自有品牌{product}的公司", "channels": ["LinkedIn", "行业展会"], "search_queries": [f"{product} manufacturer {market}"]},
|
||||
],
|
||||
"strategy": f"建议在 LinkedIn 和 Google 搜索 {market} 的 {product} 相关公司",
|
||||
"tips": ["使用多个搜索词", "找到公司后在 LinkedIn 找决策人"],
|
||||
"provider": "template",
|
||||
}
|
||||
|
||||
def _template_analysis(self, url: str) -> Dict[str, Any]:
|
||||
return {
|
||||
"match_score": 50,
|
||||
"match_reason": "无法获取网页内容进行分析,建议手动查看",
|
||||
"url": url,
|
||||
"provider": "template",
|
||||
}
|
||||
|
||||
def _template_outreach(self, company: Dict[str, Any], product: Dict[str, Any]) -> Dict[str, Any]:
|
||||
company_name = company.get("name", "")
|
||||
product_name = product.get("name", "")
|
||||
return {
|
||||
"subject": f"关于{product_name}的合作机会",
|
||||
"linkedin_message": f"您好!了解到贵司{company_name}在经营相关业务,我们专业生产{product_name},品质稳定,价格有竞争力。如有兴趣,我可以发详细资料供参考。",
|
||||
"whatsapp_message": f"Hello! We are a professional {product_name} manufacturer. Interested in exploring cooperation? Happy to share details.",
|
||||
"email_body": f"Dear {company_name} team,\n\nWe are a professional {product_name} manufacturer with competitive pricing and consistent quality. Would you be open to a quick chat to explore potential cooperation?\n\nBest regards,\n[Your Name]",
|
||||
"key_points": ["产品质量有保障", "价格有竞争力", "可定制"],
|
||||
"tips": ["发送前先了解对方背景", "LinkedIn 消息要简短"],
|
||||
"provider": "template",
|
||||
}
|
||||
Reference in New Issue
Block a user