from typing import Optional, Dict, Any, List from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_ from app.config import settings from app.models.device import Device import httpx import logging logger = logging.getLogger(__name__) class PushService: def __init__(self, db: Optional[AsyncSession] = None): self.db = db @staticmethod def send_notification(user_id: str, title: str, content: str, payload: Optional[Dict[str, Any]] = None) -> bool: logger.info(f"[PUSH] user={user_id} title={title} content_len={len(content)}") try: import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete( PushService._send_via_wechat(user_id, title, content, payload) ) loop.close() return result except Exception as e: logger.warning(f"Push send failed (logged only): {e}") return True @staticmethod def send_bulk(user_ids: List[str], title: str, content: str, payload: Optional[Dict[str, Any]] = None) -> int: sent = 0 for uid in user_ids: if PushService.send_notification(uid, title, content, payload): sent += 1 return sent async def send_async(self, user_id: str, title: str, content: str, payload: Optional[Dict[str, Any]] = None) -> bool: logger.info(f"[PUSH_ASYNC] user={user_id} title={title}") result = await self._send_via_wechat(user_id, title, content, payload) await self._save_in_app_notification(user_id, title, content, payload) return result async def register_device(self, user_id: str, client_id: str, platform: str = "weapp", push_token: Optional[str] = None, device_info: Optional[Dict] = None) -> Device: result = await self.db.execute( select(Device).where( and_(Device.user_id == user_id, Device.client_id == client_id) ) ) existing = result.scalar_one_or_none() if existing: existing.platform = platform existing.push_token = push_token existing.device_info = device_info or {} existing.is_active = True await self.db.flush() return existing device = Device( user_id=user_id, platform=platform, push_token=push_token, client_id=client_id, device_info=device_info or {}, ) self.db.add(device) await self.db.flush() return device async def get_user_devices(self, user_id: str) -> List[Dict]: result = await self.db.execute( select(Device).where( and_(Device.user_id == user_id, Device.is_active == True) ) ) devices = result.scalars().all() return [ { "id": str(d.id), "platform": d.platform, "client_id": d.client_id, "device_info": d.device_info, "created_at": d.created_at.isoformat() if d.created_at else None, } for d in devices ] async def unregister_device(self, user_id: str, client_id: str) -> bool: result = await self.db.execute( select(Device).where( and_(Device.user_id == user_id, Device.client_id == client_id) ) ) device = result.scalar_one_or_none() if not device: return False device.is_active = False await self.db.flush() return True @staticmethod async def _send_via_wechat(user_id: str, title: str, content: str, payload: Optional[Dict] = None) -> bool: if not settings.WECHAT_APP_ID or not settings.WECHAT_APP_SECRET: logger.debug("WeChat push not configured, falling back to log") return True try: from app.services.wechat import wechat_service access_token = await wechat_service._get_access_token() if not access_token: logger.warning("Cannot get WeChat access token for push") return False async with httpx.AsyncClient() as client: resp = await client.post( "https://api.weixin.qq.com/cgi-bin/message/subscribe/send", params={"access_token": access_token}, json={ "touser": user_id, "template_id": settings.WECHAT_PUSH_TEMPLATE_ID or "", "data": { "thing1": {"value": title[:20]}, "thing2": {"value": content[:20]}, }, "miniprogram_state": "formal", }, timeout=10, ) data = resp.json() if data.get("errcode", 0) != 0: logger.warning(f"WeChat push failed: {data}") return False logger.info(f"WeChat push sent to user {user_id}") return True except Exception as e: logger.warning(f"WeChat push error: {e}") return False async def _save_in_app_notification(self, user_id: str, title: str, content: str, payload: Optional[Dict] = None): if not self.db: return try: from app.services.notification import NotificationService await NotificationService.create_notification( self.db, user_id, title, content, notification_type="push", reference_type=(payload or {}).get("reference_type"), reference_id=(payload or {}).get("reference_id"), ) except Exception as e: logger.warning(f"Failed to save in-app notification: {e}")