import json import logging from fastapi import APIRouter, Depends, HTTPException, Request, Query, Header from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from pydantic import BaseModel from typing import Optional from app.database import get_db from app.services.payment import PaymentService, GATEWAY_MAP from app.services.unified_pay import UnifiedPayService from app.models.payment_transaction import PaymentTransaction from app.api.v1.deps import get_current_user_id logger = logging.getLogger(__name__) router = APIRouter() class CreateOrderRequest(BaseModel): plan: str pay_type: str = "alipay" class RefundRequest(BaseModel): order_no: str reason: str = "" @router.get("/plans") async def get_plans(): svc = PaymentService(None) return await svc.get_plans() @router.get("/subscription") async def get_subscription( user_id: str = Depends(get_current_user_id), db: AsyncSession = Depends(get_db), ): svc = PaymentService(db) return await svc.get_current_subscription(user_id) @router.post("/create-order") async def create_order( data: CreateOrderRequest, user_id: str = Depends(get_current_user_id), db: AsyncSession = Depends(get_db), ): svc = PaymentService(db) try: return await svc.create_order(user_id, data.plan, data.pay_type) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.get("/query/{order_no}") async def query_payment( order_no: str, user_id: str = Depends(get_current_user_id), db: AsyncSession = Depends(get_db), ): svc = PaymentService(db) try: return await svc.query_payment(user_id, order_no) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) @router.get("/transactions") async def list_transactions( page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), user_id: str = Depends(get_current_user_id), db: AsyncSession = Depends(get_db), ): svc = PaymentService(db) return await svc.list_transactions(user_id, page, size) @router.post("/refund") async def refund( data: RefundRequest, user_id: str = Depends(get_current_user_id), db: AsyncSession = Depends(get_db), ): svc = PaymentService(db) try: return await svc.refund(user_id, data.order_no, data.reason) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.post("/close-order") async def close_order( data: dict, user_id: str = Depends(get_current_user_id), db: AsyncSession = Depends(get_db), ): order_no = data.get("order_no", "") svc = PaymentService(db) try: return await svc.close_order(user_id, order_no) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.get("/query-refund/{order_no}") async def query_refund( order_no: str, user_id: str = Depends(get_current_user_id), db: AsyncSession = Depends(get_db), ): svc = PaymentService(db) try: return await svc.query_refund(order_no, user_id=user_id) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) @router.post("/webhook") async def unified_webhook(request: Request, db: AsyncSession = Depends(get_db)): body = await request.body() body_str = body.decode("utf-8") gw = UnifiedPayService() if not gw.verify_callback(dict(request.headers), body_str): logger.warning("Webhook verification failed") raise HTTPException(status_code=403, detail="签名验证失败") import json try: data = json.loads(body_str) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="无效的 JSON") event = data.get("event", "") pay_data = data.get("data", {}) merchant_order_id = pay_data.get("merchant_order_id", "") order_id = pay_data.get("order_id", "") transaction_id = pay_data.get("transaction_id", "") amount = pay_data.get("amount", 0) success = event in ("recharge.completed", "order.refunded") svc = PaymentService(db) await svc.handle_callback( merchant_order_id, order_id, transaction_id, success if event == "recharge.completed" else True, amount, body_str, ) return {"code": 0, "message": "OK"} @router.post("/stripe-webhook") async def stripe_webhook( request: Request, db: AsyncSession = Depends(get_db), ): body = await request.body() body_str = body.decode("utf-8") stripe_gw = GATEWAY_MAP.get("stripe") if not stripe_gw: raise HTTPException(status_code=501, detail="Stripe 未配置") if not stripe_gw.verify_callback(dict(request.headers), body_str): raise HTTPException(status_code=403, detail="Stripe 签名验证失败") parsed = stripe_gw.parse_callback(body_str, dict(request.headers)) if parsed.get("success"): svc = PaymentService(db) await svc.handle_callback( parsed["order_no"], parsed["gateway_order_id"], parsed["gateway_order_no"], True, parsed["amount"], body_str, ) return {"status": "ok"} @router.post("/paypal-capture") async def paypal_capture( request: Request, user_id: str = Depends(get_current_user_id), db: AsyncSession = Depends(get_db), ): body = await request.json() order_no = body.get("order_no", "") token = body.get("token", "") if not order_no or not token: raise HTTPException(status_code=400, detail="缺少参数") txn_result = await db.execute( select(PaymentTransaction).where( PaymentTransaction.order_no == order_no, PaymentTransaction.user_id == user_id, ) ) txn = txn_result.scalar_one_or_none() if not txn: raise HTTPException(status_code=404, detail="订单不存在") if txn.status != "pending": return {"status": "ok", "message": "已处理"} paypal_gw = GATEWAY_MAP.get("paypal") if not paypal_gw: raise HTTPException(status_code=501, detail="PayPal 未配置") try: result = await paypal_gw.capture_order(token) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if result.get("completed"): capture_id = result.get("capture_id", token) svc = PaymentService(db) await svc.handle_callback( order_no, token, capture_id, True, txn.amount, json.dumps(result) ) return {"status": "completed", "order_no": order_no} raise HTTPException(status_code=400, detail=f"PayPal capture failed: {result.get('status')}") @router.post("/paypal-webhook") async def paypal_webhook( request: Request, db: AsyncSession = Depends(get_db), ): body = await request.body() body_str = body.decode("utf-8") paypal_gw = GATEWAY_MAP.get("paypal") if not paypal_gw: raise HTTPException(status_code=501, detail="PayPal 未配置") if not paypal_gw.verify_callback(dict(request.headers), body_str): raise HTTPException(status_code=403, detail="PayPal 签名验证失败") parsed = paypal_gw.parse_callback(body_str, dict(request.headers)) if parsed.get("success"): svc = PaymentService(db) await svc.handle_callback( parsed["order_no"], parsed["gateway_order_id"], parsed["gateway_order_no"], True, parsed["amount"], body_str, ) return {"status": "ok"} @router.post("/pingpong-webhook") async def pingpong_webhook( request: Request, db: AsyncSession = Depends(get_db), ): body = await request.body() body_str = body.decode("utf-8") pp_gw = GATEWAY_MAP.get("pingpong") if not pp_gw: raise HTTPException(status_code=501, detail="PingPong 未配置") if not pp_gw.verify_callback(dict(request.headers), body_str): raise HTTPException(status_code=403, detail="PingPong 签名验证失败") parsed = pp_gw.parse_callback(body_str, dict(request.headers)) if parsed.get("success"): svc = PaymentService(db) await svc.handle_callback( parsed["order_no"], parsed["gateway_order_id"], parsed["gateway_order_no"], True, parsed["amount"], body_str, ) return {"status": "ok"}