from typing import Dict, Any, Optional, List from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_, desc from app.models.customer import Message, Conversation from app.models.user import User from app.models.preference import PreferenceAnalysis import logging logger = logging.getLogger(__name__) class UserPreferenceService: def __init__(self, db: AsyncSession): self.db = db async def record_selection(self, user_id: str, message_id: str, selected_index: int) -> bool: result = await self.db.execute( select(Message).where(Message.id == message_id) ) msg = result.scalar_one_or_none() if not msg: return False msg.selected_suggestion = selected_index await self.db.flush() return True async def record_edit(self, user_id: str, message_id: str, edited_text: str) -> bool: result = await self.db.execute( select(Message).where(Message.id == message_id) ) msg = result.scalar_one_or_none() if not msg: return False msg.user_edited = edited_text await self.db.flush() return True async def analyze_preferences(self, user_id: str) -> Dict[str, Any]: user_conv_subq = select(Conversation.id).where( Conversation.user_id == user_id ).subquery() count_result = await self.db.execute( select(func.count(Message.id)).where( and_( Message.conversation_id.in_(select(user_conv_subq)), Message.selected_suggestion.isnot(None), ) ) ) total = count_result.scalar() or 0 if total < 3: return {"needs_more_data": True, "interaction_count": total} result = await self.db.execute( select(Message) .where( and_( Message.conversation_id.in_(select(user_conv_subq)), Message.selected_suggestion.isnot(None), ) ) .order_by(desc(Message.created_at)) .limit(100) ) messages = result.scalars().all() tone_counts = {} edit_count = 0 total_chars_saved = 0 greeting_patterns = [] signoff_patterns = [] for m in messages: suggestions = m.ai_suggestions or [] selected = m.selected_suggestion if suggestions and selected is not None and selected < len(suggestions): tone = suggestions[selected].get("tone", "unknown") tone_counts[tone] = tone_counts.get(tone, 0) + 1 if m.user_edited: edit_count += 1 if suggestions and selected is not None and selected < len(suggestions): original = suggestions[selected].get("reply", "") total_chars_saved += abs(len(original) - len(m.user_edited)) preferred_tone = max(tone_counts, key=tone_counts.get) if tone_counts else "professional" edit_ratio = edit_count / len(messages) if messages else 0 avg_edit_size = total_chars_saved / edit_count if edit_count > 0 else 0 greeting_style = self._extract_greeting_style(messages) sign_off_style = self._extract_sign_off_style(messages) preferences = { "preferred_tone": preferred_tone, "edit_ratio": edit_ratio, "avg_edit_size": avg_edit_size, "greeting_style": greeting_style, "sign_off_style": sign_off_style, "tone_distribution": tone_counts, "interaction_count": len(messages), "confidence": min(1.0, len(messages) / 20), } existing = await self.db.execute( select(PreferenceAnalysis).where(PreferenceAnalysis.user_id == user_id) ) analysis = existing.scalar_one_or_none() if analysis: analysis.preferred_tone = preferred_tone analysis.greeting_style = greeting_style analysis.sign_off_style = sign_off_style analysis.analysis_data = preferences analysis.interaction_count = len(messages) analysis.confidence = preferences["confidence"] analysis.last_analysis_at = datetime.utcnow() else: analysis = PreferenceAnalysis( user_id=user_id, task_type="reply", preferred_tone=preferred_tone, greeting_style=greeting_style, sign_off_style=sign_off_style, analysis_data=preferences, interaction_count=len(messages), confidence=preferences["confidence"], last_analysis_at=datetime.utcnow(), ) self.db.add(analysis) await self.db.flush() await self._update_user_settings(user_id, preferences) return preferences async def get_preference_context(self, user_id: str, task_type: str = "reply") -> Optional[str]: result = await self.db.execute( select(PreferenceAnalysis).where( and_( PreferenceAnalysis.user_id == user_id, PreferenceAnalysis.task_type == task_type, ) ) ) analysis = result.scalar_one_or_none() if not analysis or analysis.confidence < 0.3: return None parts = [] if analysis.preferred_tone: parts.append(f"user's preferred tone: {analysis.preferred_tone}") if analysis.greeting_style: parts.append(f"user's typical greeting: {analysis.greeting_style}") if analysis.sign_off_style: parts.append(f"user's typical sign-off: {analysis.sign_off_style}") if parts: return "This user prefers: " + "; ".join(parts) + "." return None async def get_analysis(self, user_id: str) -> Dict[str, Any]: result = await self.db.execute( select(PreferenceAnalysis).where(PreferenceAnalysis.user_id == user_id) ) analysis = result.scalar_one_or_none() if not analysis: return {"analyzed": False, "interaction_count": 0, "confidence": 0} return { "analyzed": True, "preferred_tone": analysis.preferred_tone, "greeting_style": analysis.greeting_style, "sign_off_style": analysis.sign_off_style, "interaction_count": analysis.interaction_count, "confidence": analysis.confidence, "last_analysis_at": analysis.last_analysis_at.isoformat() if analysis.last_analysis_at else None, } async def _update_user_settings(self, user_id: str, preferences: Dict[str, Any]): result = await self.db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user: settings = dict(user.settings or {}) settings["preferred_tone"] = preferences.get("preferred_tone", settings.get("reply_tone", "professional")) settings["ai_learning"] = { "analyzed": True, "confidence": preferences.get("confidence", 0), "edit_ratio": preferences.get("edit_ratio", 0), "greeting_style": preferences.get("greeting_style", ""), "sign_off_style": preferences.get("sign_off_style", ""), } user.settings = settings await self.db.flush() def _extract_greeting_style(self, messages: List[Message]) -> str: for m in messages: text = m.user_edited or (m.ai_suggestions[m.selected_suggestion].get("reply", "") if m.selected_suggestion is not None and m.ai_suggestions and m.selected_suggestion < len(m.ai_suggestions) else "") if text: first_word = text.strip().split()[0] if text.strip() else "" if first_word in ["Dear", "Hi", "Hello", "Hey", "To"]: return first_word return "" def _extract_sign_off_style(self, messages: List[Message]) -> str: for m in messages: text = m.user_edited or (m.ai_suggestions[m.selected_suggestion].get("reply", "") if m.selected_suggestion is not None and m.ai_suggestions and m.selected_suggestion < len(m.ai_suggestions) else "") if text: words = text.strip().split() if len(words) >= 3: last_three = " ".join(words[-3:]) for signoff in ["Best regards", "Best wishes", "Sincerely", "Cheers", "Regards", "Yours"]: if signoff in last_three: return signoff return ""