158 lines
7.4 KiB
Python
158 lines
7.4 KiB
Python
from typing import Dict, Any, Optional
|
||
import json
|
||
from app.ai.base import AIProvider
|
||
|
||
|
||
SYSTEM_PROMPTS = {
|
||
"translate": "You are a professional translator specialized in foreign trade and e-commerce. "
|
||
"Accurately translate business terms like MOQ, FOB, CIF, lead time, etc. "
|
||
"Return ONLY the translated text, no explanations.",
|
||
"reply": "You are an experienced foreign trade sales expert. Write professional, "
|
||
"clear business replies. Be concise but warm. Include relevant details "
|
||
"naturally. Return ONLY the reply text, no explanations.",
|
||
"marketing": "You are a creative copywriter for international trade. Write compelling "
|
||
"marketing content that drives action. Adapt to the target audience's culture. "
|
||
"Return ONLY the copy, no explanations.",
|
||
"extract": "You extract structured data from text. Return ONLY valid JSON matching the requested schema.",
|
||
}
|
||
|
||
|
||
class OpenAIProvider(AIProvider):
|
||
def __init__(self, api_key: str, model: str = "gpt-4o", base_url: Optional[str] = None):
|
||
try:
|
||
from openai import AsyncOpenAI
|
||
except ImportError:
|
||
raise ImportError(
|
||
"openai>=1.0 is required for OpenAIProvider. "
|
||
"Install it with: pip install 'openai>=1.0'"
|
||
)
|
||
kwargs = {"api_key": api_key}
|
||
if base_url:
|
||
kwargs["base_url"] = base_url
|
||
self.client = AsyncOpenAI(**kwargs)
|
||
self.model = model
|
||
self._name = f"openai-{model}"
|
||
self._pricing = {
|
||
"gpt-4o": {"input": 0.01, "output": 0.03},
|
||
"gpt-4o-mini": {"input": 0.0015, "output": 0.006},
|
||
}
|
||
self._cheap_model = "gpt-4o-mini" if model == "gpt-4o" else model
|
||
|
||
async def translate(self, text: str, source_lang: Optional[str], target_lang: str, context: Optional[str] = None) -> Dict[str, Any]:
|
||
system = SYSTEM_PROMPTS["translate"]
|
||
if context:
|
||
system += f"\nContext: this is about {context}"
|
||
if source_lang and source_lang != "auto":
|
||
system += f"\nSource language: {source_lang}"
|
||
|
||
content = await self._call(system, f"Translate to {target_lang}:\n\n{text}", model=self._cheap_model)
|
||
return {"translated_text": content, "provider": self.name, "model": self.model}
|
||
|
||
async def reply(self, inquiry: str, context: Optional[Dict[str, Any]] = None, tone: str = "professional", preference_context: Optional[str] = None) -> Dict[str, Any]:
|
||
system = SYSTEM_PROMPTS["reply"] + f"\nTone: {tone}"
|
||
if preference_context:
|
||
system += f"\nUser preference: {preference_context}"
|
||
|
||
context_str = ""
|
||
if context:
|
||
if context.get("product"):
|
||
context_str += f"Product: {context['product']}\n"
|
||
if context.get("price"):
|
||
context_str += f"Price: {context['price']}\n"
|
||
if context.get("customer_history"):
|
||
context_str += f"Customer history: {context['customer_history']}\n"
|
||
if context.get("conversation_history"):
|
||
context_str += f"Previous messages: {context['conversation_history']}\n"
|
||
|
||
prompt = f"{context_str}\nCustomer inquiry:\n{inquiry}\n\nWrite a reply:"
|
||
content = await self._call(system, prompt)
|
||
return {"reply": content, "provider": self.name, "model": self.model}
|
||
|
||
async def generate_marketing(self, product_info: Dict[str, Any], target: str, style: str = "professional", language: str = "en", preference_context: Optional[str] = None) -> Dict[str, Any]:
|
||
system = SYSTEM_PROMPTS["marketing"] + f"\nStyle: {style}\nTarget audience: {target}\nLanguage: {language}"
|
||
if preference_context:
|
||
system += f"\nUser preference: {preference_context}"
|
||
|
||
product_str = json.dumps(product_info, ensure_ascii=False, indent=2)
|
||
prompt = f"Product information:\n{product_str}\n\nGenerate marketing copy:"
|
||
content = await self._call(system, prompt)
|
||
return {"content": content, "provider": self.name, "model": self.model}
|
||
|
||
async def extract_info(self, text: str, schema: Dict[str, Any]) -> Dict[str, Any]:
|
||
system = SYSTEM_PROMPTS["extract"]
|
||
schema_str = json.dumps(schema, indent=2)
|
||
prompt = f"Schema:\n{schema_str}\n\nText:\n{text}\n\nExtracted JSON:"
|
||
try:
|
||
content = await self._call(system, prompt, response_format={"type": "json_object"})
|
||
except Exception:
|
||
content = await self._call(system, prompt)
|
||
try:
|
||
data = json.loads(content)
|
||
return {"data": data, "confidence": 0.9, "provider": self.name}
|
||
except json.JSONDecodeError:
|
||
return {"data": {}, "confidence": 0.0, "provider": self.name, "error": "parse_failed"}
|
||
|
||
async def _call(self, system: str, prompt: str, max_tokens: int = 3000, response_format: Optional[Dict] = None, model: Optional[str] = None) -> str:
|
||
kwargs = {
|
||
"model": model or self.model,
|
||
"messages": [
|
||
{"role": "system", "content": system},
|
||
{"role": "user", "content": prompt},
|
||
],
|
||
"max_tokens": max_tokens,
|
||
"temperature": 0.7,
|
||
}
|
||
if response_format:
|
||
kwargs["response_format"] = response_format
|
||
|
||
resp = await self.client.chat.completions.create(**kwargs)
|
||
content = resp.choices[0].message.content
|
||
|
||
if content is None and hasattr(resp.choices[0].message, 'reasoning'):
|
||
reasoning = resp.choices[0].message.reasoning
|
||
if reasoning:
|
||
import re
|
||
final_output_patterns = [
|
||
r'Final Output Generation[::]\s*(.+?)(?:\n\n|$)',
|
||
r'Final Output[::]\s*(.+?)(?:\n\n|$)',
|
||
r'7\.\s*Final Output Generation[::]\s*(.+?)(?:\n\n|$)',
|
||
r'翻译结果[::]\s*(.+?)(?:\n\n|$)',
|
||
r'最终输出[::]\s*(.+?)(?:\n\n|$)',
|
||
]
|
||
for pattern in final_output_patterns:
|
||
match = re.search(pattern, reasoning, re.DOTALL)
|
||
if match:
|
||
content = match.group(1).strip()
|
||
break
|
||
|
||
if content is None:
|
||
paragraphs = re.split(r'\n\n+', reasoning.strip())
|
||
if paragraphs:
|
||
for p in reversed(paragraphs):
|
||
p = p.strip()
|
||
if p and len(p) > 10:
|
||
if not p.startswith('步骤') and not p.startswith('Step'):
|
||
content = p
|
||
break
|
||
|
||
if content is None and hasattr(resp.choices[0].message, 'reasoning'):
|
||
reasoning = resp.choices[0].message.reasoning
|
||
if reasoning:
|
||
import re
|
||
cleaned = re.sub(r'^步骤\d+[::].*$', '', reasoning, flags=re.MULTILINE)
|
||
cleaned = re.sub(r'^Step \d+[::].*$', '', cleaned, flags=re.MULTILINE)
|
||
cleaned = re.sub(r'\n+', '\n', cleaned).strip()
|
||
if cleaned:
|
||
content = cleaned
|
||
|
||
return content
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return self._name
|
||
|
||
@property
|
||
def cost_per_1k_tokens(self) -> float:
|
||
p = self._pricing.get(self.model, {"input": 0.01, "output": 0.03})
|
||
return (p["input"] + p["output"]) / 2
|