import hashlib import hmac import json import time import logging from typing import Optional, Dict, Any import httpx from app.config import settings from app.services.payment_gateway import PaymentGateway logger = logging.getLogger(__name__) EMPTY_SHA256 = hashlib.sha256(b"").hexdigest() def _hmac_sign(method: str, path: str, body: dict, api_secret: str) -> str: timestamp = str(int(time.time())) body_sha256 = hashlib.sha256( json.dumps(body, ensure_ascii=False, separators=(",", ":")).encode() ).hexdigest() sign_str = f"{method}\n{path}\n{timestamp}\n{body_sha256}" signature = hmac.new( api_secret.encode(), sign_str.encode(), hashlib.sha256 ).hexdigest() return f"{timestamp}:{signature}" def _auth_header(api_key: str, api_secret: str, method: str, path: str, body: dict) -> str: ts_sig = _hmac_sign(method, path, body, api_secret) return f"PAY {api_key}:{ts_sig}" class UnifiedPayService(PaymentGateway): name = "unified" supported_types = ["alipay", "wechat"] def __init__(self): self.api_key = settings.PAY_API_KEY or "" self.api_secret = settings.PAY_API_SECRET or "" self.base_url = settings.PAY_API_BASE_URL self.webhook_url = settings.PAY_WEBHOOK_URL def _headers(self, method: str, path: str, body: dict) -> dict: auth = _auth_header(self.api_key, self.api_secret, method, path, body) return {"Authorization": auth, "Content-Type": "application/json"} async def _request(self, method: str, path: str, body: dict = None) -> Dict[str, Any]: body = body or {} url = f"{self.base_url}{path}" headers = self._headers(method, path, body) async with httpx.AsyncClient() as client: resp = await client.request(method=method, url=url, json=body, headers=headers) result = resp.json() if result.get("code") != 0: raise ValueError(f"支付网关错误: {result.get('message', 'unknown')}") return result.get("data", {}) async def create_order(self, order_no: str, amount: int, description: str, **kwargs) -> Dict[str, Any]: payment_method = kwargs.get("pay_type", "alipay") if payment_method == "native": payment_method = "wechat" elif payment_method == "jsapi": payment_method = "wechat" elif payment_method == "pc": payment_method = "alipay" remark = kwargs.get("remark", "") body = { "merchant_order_id": order_no, "amount": amount / 100, "payment_method": payment_method, "subject": description or "TradeMate 会员充值", "notify_url": self.webhook_url, } if remark: body["remark"] = remark result = await self._request("POST", "/v1/pay/orders", body) out = { "gateway_order_id": result.get("gateway_order_id", ""), "merchant_order_id": result.get("merchant_order_id", order_no), "amount": result.get("amount", amount / 100), "payment_method": payment_method, "status": result.get("status", "pending"), } if payment_method == "alipay": out["pay_url"] = result.get("pay_url", "") else: out["code_url"] = result.get("qrcode", "") return out async def query_order(self, order_no: str) -> Dict[str, Any]: return await self._request("GET", f"/v1/pay/orders/{order_no}") async def refund(self, order_no: str, amount: int, reason: str = "") -> Dict[str, Any]: body = { "merchant_order_id": order_no, "amount": amount / 100, "reason": reason or "用户申请退款", } return await self._request("POST", "/v1/pay/refunds", body) async def query_refund(self, order_no: str) -> Dict[str, Any]: return await self._request("GET", f"/v1/pay/refunds/{order_no}") def verify_callback(self, headers: dict, body: str) -> bool: auth = headers.get("authorization", headers.get("Authorization", "")) if not auth.startswith("PAY "): logger.warning("Webhook missing PAY Authorization header") return False parts = auth[4:].strip().split(":") if len(parts) != 3: logger.warning("Webhook invalid Authorization format") return False api_key, timestamp, signature = parts if api_key != self.api_key: logger.warning("Webhook API key mismatch") return False now = int(time.time()) if abs(now - int(timestamp)) > 300: logger.warning("Webhook timestamp expired") return False body_sha256 = hashlib.sha256(body.encode()).hexdigest() sign_str = f"POST\n/api/v1/payment/webhook\n{timestamp}\n{body_sha256}" expected = hmac.new( self.api_secret.encode(), sign_str.encode(), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(expected, signature): logger.warning("Webhook signature mismatch") return False return True def parse_callback(self, body: str, headers: dict) -> Dict[str, Any]: data = json.loads(body) event = data.get("event", "") payload = data.get("data", {}) return { "event": event, "order_no": payload.get("merchant_order_id", ""), "gateway_order_id": payload.get("order_id", ""), "gateway_order_no": payload.get("transaction_id", ""), "amount": payload.get("amount", 0), "success": event == "recharge.completed", "raw": payload, } async def close_order(self, order_no: str) -> Dict[str, Any]: return await self._request("POST", f"/v1/pay/orders/{order_no}/close")