from typing import Dict, Any, Optional, List from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_ from app.models.customer import Customer, Conversation, Message import logging logger = logging.getLogger(__name__) class SilentPatternService: def __init__(self, db: AsyncSession): self.db = db async def analyze_silent_risk(self, user_id: str) -> List[Dict[str, Any]]: cutoff_3d = datetime.utcnow() - timedelta(days=3) cutoff_7d = datetime.utcnow() - timedelta(days=7) result = await self.db.execute( select(Customer).where( and_( Customer.user_id == user_id, Customer.status.in_(["lead", "negotiating"]), ) ) ) customers = result.scalars().all() risk_scores = [] for c in customers: score, reasons = await self._calculate_risk_score(c, cutoff_3d, cutoff_7d) if score > 0: risk_scores.append({ "customer_id": str(c.id), "name": c.name, "company": c.company, "country": c.country, "status": c.status, "estimated_value": c.estimated_value, "last_contact_at": c.last_contact_at.isoformat() if c.last_contact_at else None, "silence_days": (datetime.utcnow() - c.last_contact_at).days if c.last_contact_at else 0, "risk_score": score, "risk_level": self._risk_level(score), "reasons": reasons, }) risk_scores.sort(key=lambda x: x["risk_score"], reverse=True) return risk_scores async def _calculate_risk_score( self, customer: Customer, cutoff_3d: datetime, cutoff_7d: datetime ) -> tuple: score = 0 reasons = [] if not customer.last_contact_at: return (0, []) silence_days = (datetime.utcnow() - customer.last_contact_at).days if silence_days >= 7: score += 40 reasons.append(f"沉默超过7天") elif silence_days >= 3: score += 20 reasons.append(f"沉默超过3天") conv_query = await self.db.execute( select(Conversation).where( and_( Conversation.customer_id == customer.id, Conversation.user_id == customer.user_id, ) ).order_by(Conversation.created_at.desc()).limit(1) ) conv = conv_query.scalar_one_or_none() if not conv: return (score, reasons) msg_count_query = await self.db.execute( select(func.count(Message.id)).where( and_( Message.conversation_id == conv.id, Message.direction == "inbound", ) ) ) inbound_count = msg_count_query.scalar() or 0 if inbound_count >= 5 and silence_days >= 3: score += 20 reasons.append(f"前期沟通频繁({inbound_count}条)后突然沉默") if customer.status == "lead" and silence_days >= 3: score += 15 reasons.append("潜在客户阶段需及时跟进") if customer.status == "negotiating" and silence_days >= 2: score += 25 reasons.append("谈判阶段客户需保持热度") recent_query = await self.db.execute( select(Message).where( and_( Message.conversation_id == conv.id, Message.created_at >= cutoff_7d, ) ).order_by(Message.created_at.desc()).limit(3) ) recent_msgs = recent_query.scalars().all() if recent_msgs: last_inbound = None for m in recent_msgs: if m.direction == "inbound": last_inbound = m break if last_inbound and silence_days >= 1: content_lower = last_inbound.content.lower() closing_signals = ["i'll think", "let me check", "too expensive", "high price", "not now", "maybe later", "considering"] for signal in closing_signals: if signal in content_lower: score += 15 reasons.append(f"客户回复含消极信号: \"{signal}\"") break return (min(score, 100), reasons) def _risk_level(self, score: int) -> str: if score >= 70: return "high" elif score >= 40: return "medium" elif score >= 20: return "low" return "minimal" async def get_suggestions(self, user_id: str, customer_id: str) -> List[str]: score_result = await self.analyze_silent_risk(user_id) customer_scores = [s for s in score_result if s["customer_id"] == customer_id] if not customer_scores: return [] score = customer_scores[0] suggestions = [] silence_days = score["silence_days"] if silence_days >= 7: suggestions.extend([ f"客户{score['name']}已沉默{silence_days}天,建议发送产品更新或行业资讯重新激活", "考虑提供限时优惠或样品折扣打动客户", ]) elif silence_days >= 3: suggestions.extend([ f"客户{score['name']}沉默{silence_days}天,建议发送跟进消息询问是否有进一步需求", "可分享相关案例或成功故事保持客户兴趣", ]) if "negotiating" in score.get("status", ""): suggestions.append("谈判阶段客户,建议主动提供更多产品细节或定制方案") if "消极信号" in str(score.get("reasons", [])): suggestions.append("客户曾表达价格顾虑,建议重新审视报价或提供增值服务") if not suggestions: suggestions.append("客户状态良好,建议保持定期跟进节奏") return suggestions