from typing import Dict, Any, Optional, List from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, or_, desc from app.models.followup import FollowupStrategy, FollowupLog from app.models.customer import Customer from app.models.notification import Notification from app.ai.router import get_ai_router from app.services.customer_health import CustomerHealthService import logging logger = logging.getLogger(__name__) DEFAULT_STRATEGIES = [ { "name": "温和提醒", "description": "沉默3-5天,健康分50-79 — 温和提醒", "trigger_condition": { "min_silence_days": 3, "max_silence_days": 5, "min_health_score": 50, "max_health_score": 79, "status_filter": ["lead", "negotiating"], }, "channel": "whatsapp", "ai_prompt_template": "You are a professional export sales assistant. Write a gentle follow-up message to a customer who hasn't responded in {silence_days} days. Customer name: {customer_name}. Tone: warm but professional. Keep under 100 words. Suggest checking if they need any further information about the product.", "priority": 1, }, { "name": "价值提供", "description": "沉默6-10天,健康分30-49 — 推送价值信息", "trigger_condition": { "min_silence_days": 6, "max_silence_days": 10, "min_health_score": 30, "max_health_score": 49, "status_filter": ["lead", "negotiating"], }, "channel": "email", "ai_prompt_template": "You are a professional export sales assistant. Write a follow-up email to a customer who hasn't responded in {silence_days} days. Customer: {customer_name}. Share some valuable industry news, new product catalog highlights, or certification updates to rekindle interest. Keep under 150 words.", "priority": 2, }, { "name": "重新激活", "description": "沉默11+天,健康分<30 — 紧急重新激活", "trigger_condition": { "min_silence_days": 11, "max_silence_days": 999, "min_health_score": 0, "max_health_score": 29, "status_filter": ["lead", "negotiating"], }, "channel": "email", "ai_prompt_template": "You are a professional export sales assistant. Write a re-engagement email to a customer who has been silent for {silence_days} days. Customer: {customer_name}. Offer a limited-time discount, new product launch info, or a holiday greeting. Create a sense of urgency without being pushy. Keep under 150 words.", "priority": 3, }, { "name": "促进决策", "description": "客户有回复但未成交,健康分60+ — 促进成交", "trigger_condition": { "min_silence_days": 2, "max_silence_days": 7, "min_health_score": 60, "max_health_score": 100, "status_filter": ["negotiating"], }, "channel": "whatsapp", "ai_prompt_template": "You are a professional export sales assistant. The customer {customer_name} has shown interest but hasn't placed an order yet. Write a message sharing a success story, a limited-time offer, or highlighting what makes your product different from competitors. Keep under 120 words. Tone: confident and helpful.", "priority": 0, }, ] class FollowupEngine: def __init__(self, db: AsyncSession): self.db = db self.ai = get_ai_router() self.health_service = CustomerHealthService(db) async def ensure_default_strategies(self): result = await self.db.execute( select(FollowupStrategy).limit(1) ) if result.scalar_one_or_none(): return for s in DEFAULT_STRATEGIES: strategy = FollowupStrategy( name=s["name"], description=s["description"], trigger_condition=s["trigger_condition"], channel=s["channel"], ai_prompt_template=s["ai_prompt_template"], priority=s["priority"], ) self.db.add(strategy) await self.db.flush() logger.info(f"Created {len(DEFAULT_STRATEGIES)} default followup strategies") async def get_strategies(self) -> List[Dict[str, Any]]: result = await self.db.execute( select(FollowupStrategy).order_by(FollowupStrategy.priority) ) strategies = result.scalars().all() return [ { "id": str(s.id), "name": s.name, "description": s.description, "trigger_condition": s.trigger_condition, "channel": s.channel, "priority": s.priority, "is_active": s.is_active, } for s in strategies ] async def evaluate_customer(self, user_id: str, customer: Customer) -> Optional[Dict[str, Any]]: health = await self.health_service.get_customer_health(user_id, str(customer.id)) if not health: return None silence_days = health["dimensions"]["silence"]["days"] health_score = health["total_score"] strategies_result = await self.db.execute( select(FollowupStrategy).where( and_( FollowupStrategy.is_active == True, ) ).order_by(FollowupStrategy.priority) ) strategies = strategies_result.scalars().all() for strategy in strategies: cond = strategy.trigger_condition if not cond: continue if silence_days < cond.get("min_silence_days", 0): continue if silence_days > cond.get("max_silence_days", 999): continue if health_score < cond.get("min_health_score", 0): continue if health_score > cond.get("max_health_score", 100): continue if cond.get("status_filter") and customer.status not in cond["status_filter"]: continue existing = await self.db.execute( select(FollowupLog).where( and_( FollowupLog.customer_id == customer.id, FollowupLog.strategy_id == strategy.id, FollowupLog.status.in_(["pending", "sent"]), FollowupLog.created_at > datetime.utcnow() - timedelta(days=7), ) ) ) if existing.scalar_one_or_none(): continue return { "strategy": strategy, "silence_days": silence_days, "health_score": health_score, } return None async def generate_followup_content(self, strategy: FollowupStrategy, customer: Customer, silence_days: int) -> str: try: prompt = strategy.ai_prompt_template.format( customer_name=customer.name, silence_days=silence_days, company=customer.company or "", ) result = await self.ai.execute("marketing", "generate_marketing", {"name": customer.name, "description": prompt}, customer.country or "US", "professional", "en" ) return result.get("content", "") except Exception as e: logger.warning(f"AI content generation failed: {e}") return f"Hi {customer.name}, just checking in to see if you need any further information about our products. Looking forward to hearing from you!" async def create_followup_log(self, user_id: str, customer: Customer, strategy: FollowupStrategy, silence_days: int, health_score: int, content: str) -> FollowupLog: log = FollowupLog( user_id=user_id, customer_id=customer.id, strategy_id=strategy.id, status="pending", channel=strategy.channel, ai_generated_content=content, content=content, health_score_at_time=health_score, silence_days_at_time=silence_days, ) self.db.add(log) await self.db.flush() return log async def scan_and_followup(self) -> Dict[str, Any]: await self.ensure_default_strategies() customers_result = await self.db.execute( select(Customer).where( Customer.status.in_(["lead", "negotiating"]) ) ) customers = customers_result.scalars().all() processed = 0 notifications_sent = 0 logs_created = 0 for customer in customers: try: result = await self.evaluate_customer(str(customer.user_id), customer) if not result: continue content = await self.generate_followup_content( result["strategy"], customer, result["silence_days"] ) log = await self.create_followup_log( str(customer.user_id), customer, result["strategy"], result["silence_days"], result["health_score"], content, ) title = f"跟进提醒: {customer.name}" notify_content = f"{result['strategy'].name} — {content[:80]}..." n = Notification( user_id=customer.user_id, title=title, content=notify_content, notification_type="followup", reference_type="customer", reference_id=str(customer.id), ) self.db.add(n) processed += 1 logs_created += 1 notifications_sent += 1 except Exception as e: logger.error(f"Followup scan failed for customer {customer.id}: {e}") continue if processed > 0: await self.db.flush() logger.info(f"Followup scan: {processed} customers matched, {logs_created} logs, {notifications_sent} notifications") return { "customers_scanned": len(customers), "followups_created": logs_created, "notifications_sent": notifications_sent, } async def get_pending_followups(self, user_id: str, page: int = 1, size: int = 20) -> Dict[str, Any]: query = select(FollowupLog).where( and_( FollowupLog.user_id == user_id, FollowupLog.status == "pending", ) ).order_by(FollowupLog.created_at.desc()).offset( (page - 1) * size ).limit(size) count_q = select(FollowupLog).where( and_( FollowupLog.user_id == user_id, FollowupLog.status == "pending", ) ) result = await self.db.execute(query) logs = result.scalars().all() count_result = await self.db.execute(count_q) total = len(count_result.scalars().all()) items = [] # Use join to avoid N+1 query problem query = select(FollowupLog, Customer).join( Customer, FollowupLog.customer_id == Customer.id, isouter=True ).where( FollowupLog.user_id == user_id ).order_by( FollowupLog.created_at.desc() ).offset((page - 1) * size).limit(size) result = await self.db.execute(query) rows = result.all() items = [] for log, customer in rows: items.append({ "id": str(log.id), "customer_id": str(log.customer_id), "customer_name": customer.name if customer else "Unknown", "strategy": "跟进", "channel": log.channel, "content": log.content, "ai_generated_content": log.ai_generated_content, "health_score": log.health_score_at_time, "silence_days": log.silence_days_at_time, "status": log.status, "created_at": log.created_at.isoformat() if log.created_at else None, }) return {"items": items, "total": total, "page": page, "size": size} async def get_followup_logs(self, user_id: str, page: int = 1, size: int = 20) -> Dict[str, Any]: query = select(FollowupLog).where( FollowupLog.user_id == user_id ).order_by(FollowupLog.created_at.desc()).offset( (page - 1) * size ).limit(size) count_q = select(FollowupLog).where(FollowupLog.user_id == user_id) result = await self.db.execute(query) logs = result.scalars().all() count_result = await self.db.execute(count_q) total = len(count_result.scalars().all()) items = [] for log in logs: customer_result = await self.db.execute( select(Customer).where(Customer.id == log.customer_id) ) customer = customer_result.scalar_one_or_none() items.append({ "id": str(log.id), "customer_id": str(log.customer_id), "customer_name": customer.name if customer else "Unknown", "channel": log.channel, "content": log.content, "ai_generated_content": log.ai_generated_content, "user_edited_content": log.user_edited_content, "status": log.status, "health_score": log.health_score_at_time, "silence_days": log.silence_days_at_time, "sent_at": log.sent_at.isoformat() if log.sent_at else None, "replied_at": log.replied_at.isoformat() if log.replied_at else None, "created_at": log.created_at.isoformat() if log.created_at else None, }) return {"items": items, "total": total, "page": page, "size": size} async def mark_sent(self, user_id: str, log_id: str) -> bool: result = await self.db.execute( select(FollowupLog).where( and_(FollowupLog.id == log_id, FollowupLog.user_id == user_id) ) ) log = result.scalar_one_or_none() if not log: return False log.status = "sent" log.sent_at = datetime.utcnow() await self.db.flush() return True async def mark_edited(self, user_id: str, log_id: str, edited_text: str) -> bool: result = await self.db.execute( select(FollowupLog).where( and_(FollowupLog.id == log_id, FollowupLog.user_id == user_id) ) ) log = result.scalar_one_or_none() if not log: return False log.user_edited_content = edited_text log.content = edited_text log.status = "sent" log.sent_at = datetime.utcnow() await self.db.flush() return True async def get_stats(self, user_id: str) -> Dict[str, Any]: logs_result = await self.db.execute( select(FollowupLog).where(FollowupLog.user_id == user_id) ) all_logs = logs_result.scalars().all() total = len(all_logs) pending = sum(1 for l in all_logs if l.status == "pending") sent = sum(1 for l in all_logs if l.status == "sent") replied = sum(1 for l in all_logs if l.status == "replied") return { "total_followups": total, "pending": pending, "sent": sent, "replied": replied, "completion_rate": round(sent / total * 100, 1) if total > 0 else 0, }