Files
trade-assistant/backend/app/services/push.py
T
TradeMate Dev 7b62c2f8b4 feat: 修复 H5 底部导航覆盖 + 更新项目进度文档
## H5 底部导航修复 (Bug #10)
- 精简 App.vue,移除重复 tabbar,仅保留全局样式
- uni-page 设置 height: calc(100% - 50px) + overflow-y: auto
- 内容区域精确停在底部导航上方,独立滚动不再叠加
- 恢复 custom-tab-bar 组件

## 项目进度文档
- PROGRESS.md 更新至 10 个 Bug 修复
- 新增 H5 底部导航修复记录
- 新增历史变更条目
2026-05-12 20:24:42 +08:00

157 lines
5.8 KiB
Python

from typing import Optional, Dict, Any, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_
from app.config import settings
from app.models.device import Device
import httpx
import logging
logger = logging.getLogger(__name__)
class PushService:
def __init__(self, db: Optional[AsyncSession] = None):
self.db = db
@staticmethod
def send_notification(user_id: str, title: str, content: str, payload: Optional[Dict[str, Any]] = None) -> bool:
logger.info(f"[PUSH] user={user_id} title={title} content={content}")
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
PushService._send_via_wechat(user_id, title, content, payload)
)
loop.close()
return result
except Exception as e:
logger.warning(f"Push send failed (logged only): {e}")
return True
@staticmethod
def send_bulk(user_ids: List[str], title: str, content: str, payload: Optional[Dict[str, Any]] = None) -> int:
sent = 0
for uid in user_ids:
if PushService.send_notification(uid, title, content, payload):
sent += 1
return sent
async def send_async(self, user_id: str, title: str, content: str, payload: Optional[Dict[str, Any]] = None) -> bool:
logger.info(f"[PUSH_ASYNC] user={user_id} title={title}")
result = await self._send_via_wechat(user_id, title, content, payload)
await self._save_in_app_notification(user_id, title, content, payload)
return result
async def register_device(self, user_id: str, client_id: str, platform: str = "weapp", push_token: Optional[str] = None, device_info: Optional[Dict] = None) -> Device:
result = await self.db.execute(
select(Device).where(
and_(Device.user_id == user_id, Device.client_id == client_id)
)
)
existing = result.scalar_one_or_none()
if existing:
existing.platform = platform
existing.push_token = push_token
existing.device_info = device_info or {}
existing.is_active = True
await self.db.flush()
return existing
device = Device(
user_id=user_id,
platform=platform,
push_token=push_token,
client_id=client_id,
device_info=device_info or {},
)
self.db.add(device)
await self.db.flush()
return device
async def get_user_devices(self, user_id: str) -> List[Dict]:
result = await self.db.execute(
select(Device).where(
and_(Device.user_id == user_id, Device.is_active == True)
)
)
devices = result.scalars().all()
return [
{
"id": str(d.id),
"platform": d.platform,
"client_id": d.client_id,
"device_info": d.device_info,
"created_at": d.created_at.isoformat() if d.created_at else None,
}
for d in devices
]
async def unregister_device(self, user_id: str, client_id: str) -> bool:
result = await self.db.execute(
select(Device).where(
and_(Device.user_id == user_id, Device.client_id == client_id)
)
)
device = result.scalar_one_or_none()
if not device:
return False
device.is_active = False
await self.db.flush()
return True
@staticmethod
async def _send_via_wechat(user_id: str, title: str, content: str, payload: Optional[Dict] = None) -> bool:
if not settings.WECHAT_APP_ID or not settings.WECHAT_APP_SECRET:
logger.debug("WeChat push not configured, falling back to log")
return True
try:
from app.services.wechat import wechat_service
access_token = await wechat_service._get_access_token()
if not access_token:
logger.warning("Cannot get WeChat access token for push")
return False
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://api.weixin.qq.com/cgi-bin/message/subscribe/send",
params={"access_token": access_token},
json={
"touser": user_id,
"template_id": settings.WECHAT_PUSH_TEMPLATE_ID or "",
"data": {
"thing1": {"value": title[:20]},
"thing2": {"value": content[:20]},
},
"miniprogram_state": "formal",
},
timeout=10,
)
data = resp.json()
if data.get("errcode", 0) != 0:
logger.warning(f"WeChat push failed: {data}")
return False
logger.info(f"WeChat push sent to user {user_id}")
return True
except Exception as e:
logger.warning(f"WeChat push error: {e}")
return False
async def _save_in_app_notification(self, user_id: str, title: str, content: str, payload: Optional[Dict] = None):
if not self.db:
return
try:
from app.services.notification import NotificationService
await NotificationService.create_notification(
self.db, user_id, title, content,
notification_type="push",
reference_type=(payload or {}).get("reference_type"),
reference_id=(payload or {}).get("reference_id"),
)
except Exception as e:
logger.warning(f"Failed to save in-app notification: {e}")