Files

281 lines
8.3 KiB
Python

import json
import logging
from fastapi import APIRouter, Depends, HTTPException, Request, Query, Header
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel
from typing import Optional
from app.database import get_db
from app.services.payment import PaymentService, GATEWAY_MAP
from app.services.unified_pay import UnifiedPayService
from app.models.payment_transaction import PaymentTransaction
from app.api.v1.deps import get_current_user_id
logger = logging.getLogger(__name__)
router = APIRouter()
class CreateOrderRequest(BaseModel):
plan: str
pay_type: str = "alipay"
class RefundRequest(BaseModel):
order_no: str
reason: str = ""
@router.get("/plans")
async def get_plans():
svc = PaymentService(None)
return await svc.get_plans()
@router.get("/subscription")
async def get_subscription(
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
):
svc = PaymentService(db)
return await svc.get_current_subscription(user_id)
@router.post("/create-order")
async def create_order(
data: CreateOrderRequest,
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
):
svc = PaymentService(db)
try:
return await svc.create_order(user_id, data.plan, data.pay_type)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/query/{order_no}")
async def query_payment(
order_no: str,
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
):
svc = PaymentService(db)
try:
return await svc.query_payment(user_id, order_no)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.get("/transactions")
async def list_transactions(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
):
svc = PaymentService(db)
return await svc.list_transactions(user_id, page, size)
@router.post("/refund")
async def refund(
data: RefundRequest,
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
):
svc = PaymentService(db)
try:
return await svc.refund(user_id, data.order_no, data.reason)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/close-order")
async def close_order(
data: dict,
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
):
order_no = data.get("order_no", "")
svc = PaymentService(db)
try:
return await svc.close_order(user_id, order_no)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/query-refund/{order_no}")
async def query_refund(
order_no: str,
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
):
svc = PaymentService(db)
try:
return await svc.query_refund(order_no, user_id=user_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.post("/webhook")
async def unified_webhook(request: Request, db: AsyncSession = Depends(get_db)):
body = await request.body()
body_str = body.decode("utf-8")
gw = UnifiedPayService()
if not gw.verify_callback(dict(request.headers), body_str):
logger.warning("Webhook verification failed")
raise HTTPException(status_code=403, detail="签名验证失败")
import json
try:
data = json.loads(body_str)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="无效的 JSON")
event = data.get("event", "")
pay_data = data.get("data", {})
merchant_order_id = pay_data.get("merchant_order_id", "")
order_id = pay_data.get("order_id", "")
transaction_id = pay_data.get("transaction_id", "")
amount = pay_data.get("amount", 0)
success = event in ("recharge.completed", "order.refunded")
svc = PaymentService(db)
await svc.handle_callback(
merchant_order_id, order_id, transaction_id,
success if event == "recharge.completed" else True,
amount, body_str,
)
return {"code": 0, "message": "OK"}
@router.post("/stripe-webhook")
async def stripe_webhook(
request: Request,
db: AsyncSession = Depends(get_db),
):
body = await request.body()
body_str = body.decode("utf-8")
stripe_gw = GATEWAY_MAP.get("stripe")
if not stripe_gw:
raise HTTPException(status_code=501, detail="Stripe 未配置")
if not stripe_gw.verify_callback(dict(request.headers), body_str):
raise HTTPException(status_code=403, detail="Stripe 签名验证失败")
parsed = stripe_gw.parse_callback(body_str, dict(request.headers))
if parsed.get("success"):
svc = PaymentService(db)
await svc.handle_callback(
parsed["order_no"],
parsed["gateway_order_id"],
parsed["gateway_order_no"],
True,
parsed["amount"],
body_str,
)
return {"status": "ok"}
@router.post("/paypal-capture")
async def paypal_capture(
request: Request,
user_id: str = Depends(get_current_user_id),
db: AsyncSession = Depends(get_db),
):
body = await request.json()
order_no = body.get("order_no", "")
token = body.get("token", "")
if not order_no or not token:
raise HTTPException(status_code=400, detail="缺少参数")
txn_result = await db.execute(
select(PaymentTransaction).where(
PaymentTransaction.order_no == order_no,
PaymentTransaction.user_id == user_id,
)
)
txn = txn_result.scalar_one_or_none()
if not txn:
raise HTTPException(status_code=404, detail="订单不存在")
if txn.status != "pending":
return {"status": "ok", "message": "已处理"}
paypal_gw = GATEWAY_MAP.get("paypal")
if not paypal_gw:
raise HTTPException(status_code=501, detail="PayPal 未配置")
try:
result = await paypal_gw.capture_order(token)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if result.get("completed"):
capture_id = result.get("capture_id", token)
svc = PaymentService(db)
await svc.handle_callback(
order_no, token, capture_id, True, txn.amount, json.dumps(result)
)
return {"status": "completed", "order_no": order_no}
raise HTTPException(status_code=400, detail=f"PayPal capture failed: {result.get('status')}")
@router.post("/paypal-webhook")
async def paypal_webhook(
request: Request,
db: AsyncSession = Depends(get_db),
):
body = await request.body()
body_str = body.decode("utf-8")
paypal_gw = GATEWAY_MAP.get("paypal")
if not paypal_gw:
raise HTTPException(status_code=501, detail="PayPal 未配置")
if not paypal_gw.verify_callback(dict(request.headers), body_str):
raise HTTPException(status_code=403, detail="PayPal 签名验证失败")
parsed = paypal_gw.parse_callback(body_str, dict(request.headers))
if parsed.get("success"):
svc = PaymentService(db)
await svc.handle_callback(
parsed["order_no"],
parsed["gateway_order_id"],
parsed["gateway_order_no"],
True,
parsed["amount"],
body_str,
)
return {"status": "ok"}
@router.post("/pingpong-webhook")
async def pingpong_webhook(
request: Request,
db: AsyncSession = Depends(get_db),
):
body = await request.body()
body_str = body.decode("utf-8")
pp_gw = GATEWAY_MAP.get("pingpong")
if not pp_gw:
raise HTTPException(status_code=501, detail="PingPong 未配置")
if not pp_gw.verify_callback(dict(request.headers), body_str):
raise HTTPException(status_code=403, detail="PingPong 签名验证失败")
parsed = pp_gw.parse_callback(body_str, dict(request.headers))
if parsed.get("success"):
svc = PaymentService(db)
await svc.handle_callback(
parsed["order_no"],
parsed["gateway_order_id"],
parsed["gateway_order_no"],
True,
parsed["amount"],
body_str,
)
return {"status": "ok"}