from fastapi import APIRouter, Depends from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from typing import Optional, List from pydantic import BaseModel from app.database import get_db from app.models.user import User from app.core.security import decode_token router = APIRouter() class DeviceRegister(BaseModel): client_id: str platform: Optional[str] = None device_info: Optional[dict] = None class PushMessage(BaseModel): title: str content: str payload: Optional[dict] = None target_type: str = "all" target_value: Optional[str] = None class PushResponse(BaseModel): success: bool message_id: Optional[str] = None error: Optional[str] = None # 模拟存储的设备信息(实际应存数据库) devices_db = {} @router.post("/register") async def register_device( data: DeviceRegister, authorization: str = None, db: AsyncSession = Depends(get_db), ): if not authorization or not authorization.startswith("Bearer "): return {"error": "Unauthorized"}, 401 payload = decode_token(authorization[7:]) if not payload: return {"error": "Invalid token"}, 401 user_id = payload.get("sub") if user_id not in devices_db: devices_db[user_id] = [] existing = [d for d in devices_db[user_id] if d.get("client_id") == data.client_id] if not existing: devices_db[user_id].append({ "client_id": data.client_id, "platform": data.platform, "device_info": data.device_info, }) return {"success": True, "message": "Device registered"} @router.post("/send") async def send_push( message: PushMessage, authorization: str = None, db: AsyncSession = Depends(get_db), ): if not authorization or not authorization.startswith("Bearer "): return {"error": "Unauthorized"}, 401 payload = decode_token(authorization[7:]) if not payload: return {"error": "Invalid token"}, 401 user_id = payload.get("sub") user_devices = devices_db.get(user_id, []) if not user_devices: return PushResponse(success=False, error="No devices registered") # 实际项目中这里调用 uni-push/极光等API # 模拟返回成功 message_id = f"msg_{user_id}_{int(payload.get('iat', 0))}" print(f"Push message to user {user_id}: {message.title} - {message.content}") return PushResponse(success=True, message_id=message_id) @router.post("/send-to-customer") async def send_to_customer( customer_id: str, title: str, content: str, payload: Optional[dict] = None, authorization: str = None, ): """ 针对特定客户的推送通知 例如:客户沉默提醒、报价提醒等 """ if not authorization or not authorization.startswith("Bearer "): return {"error": "Unauthorized"}, 401 payload_data = decode_token(authorization[7:]) if not payload_data: return {"error": "Invalid token"}, 401 user_id = payload_data.get("sub") # 这里可以添加针对客户的特定逻辑 notification = { "type": "customer_alert", "customer_id": customer_id, "title": title, "content": content, "payload": payload or {} } print(f"Customer notification for user {user_id}, customer {customer_id}: {title}") return PushResponse(success=True, message_id=f"alert_{customer_id}") @router.get("/devices") async def list_devices( authorization: str = None, ): """列出用户已注册的设备""" if not authorization or not authorization.startswith("Bearer "): return {"error": "Unauthorized"}, 401 payload = decode_token(authorization[7:]) if not payload: return {"error": "Invalid token"}, 401 user_id = payload.get("sub") user_devices = devices_db.get(user_id, []) return { "devices": user_devices, "count": len(user_devices) }