Files
trade-assistant/backend/app/services/wechat_pay.py
T
TradeMate Dev c397740748 feat: WeChat Pay integration, translation quota management, login UX fixes
- WeChat Pay APIv3 integration (JSAPI + Native) with cert-based auth
- TranslationQuota model + admin management UI (配额 tab)
- Alibaba MT provider now checks quota before translation
- Fix: admin tabs scrollable on mobile, remove header-card
- Fix: profile/login navigation - logout stays on profile, login returns to profile
- Fix: login form now visible by default (no extra click to show)
- Fix: home page translate link uses navigateTo (was switchTab to non-tabBar page)
- Add .coverage and apiclient_key.pem to gitignore
2026-05-20 18:30:12 +08:00

182 lines
7.2 KiB
Python

import json
import time
import logging
import uuid
import base64
from typing import Optional, Dict, Any
from pathlib import Path
import httpx
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.backends import default_backend
from app.config import settings
logger = logging.getLogger(__name__)
class WeChatPayService:
def __init__(self):
self.mch_id = settings.WECHAT_PAY_MCH_ID
self.api_key = settings.WECHAT_PAY_API_KEY
self.serial_no = settings.WECHAT_PAY_SERIAL_NO
self.app_id = settings.WECHAT_APP_ID
self.api_base = settings.WECHAT_PAY_API_BASE
self.notify_url = settings.WECHAT_PAY_NOTIFY_URL
self._private_key = None
def _load_private_key(self) -> bytes:
if self._private_key:
return self._private_key
cert_dir = Path(settings.WECHAT_PAY_CERT_DIR)
key_path = cert_dir / "apiclient_key.pem"
if not key_path.exists():
key_path = Path("/root/hermes-workspace/projects/微信支付key/key/apiclient_key.pem")
with open(key_path, "rb") as f:
self._private_key = f.read()
return self._private_key
def _sign_rsa(self, sign_str: str) -> str:
private_key_data = self._load_private_key()
key = serialization.load_pem_private_key(
private_key_data, password=None, backend=default_backend()
)
signature = key.sign(
sign_str.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
return base64.b64encode(signature).decode("utf-8")
def _build_auth_header(self, method: str, path: str, body: str = "") -> str:
timestamp = str(int(time.time()))
nonce = uuid.uuid4().hex[:16]
sign_str = f"{method}\n{path}\n{timestamp}\n{nonce}\n{body}\n"
signature = self._sign_rsa(sign_str)
return (
f'WECHATPAY2-SHA256-RSA2048 '
f'mchid="{self.mch_id}",'
f'nonce_str="{nonce}",'
f'timestamp="{timestamp}",'
f'serial_no="{self.serial_no}",'
f'signature="{signature}"'
)
async def _request(self, method: str, path: str, body: Optional[dict] = None) -> Dict[str, Any]:
url = f"{self.api_base}{path}"
body_str = json.dumps(body, ensure_ascii=False, separators=(",", ":")) if body else ""
auth = self._build_auth_header(method, path, body_str)
async with httpx.AsyncClient() as client:
resp = await client.request(
method=method,
url=url,
content=body_str if body else None,
headers={
"Authorization": auth,
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "TradeMate/1.0",
},
)
data = resp.json() if resp.text else {}
if resp.status_code >= 400:
logger.error(f"WeChat Pay API error: {resp.status_code} {data}")
raise Exception(f"WeChat Pay error: {data.get('message', resp.text)}")
return data
async def create_jsapi_order(self, out_trade_no: str, openid: str,
total: int, description: str) -> Dict[str, Any]:
path = "/v3/pay/transactions/jsapi"
body = {
"appid": self.app_id,
"mchid": self.mch_id,
"description": description,
"out_trade_no": out_trade_no,
"notify_url": self.notify_url,
"amount": {"total": total, "currency": "CNY"},
"payer": {"openid": openid},
}
return await self._request("POST", path, body)
async def create_native_order(self, out_trade_no: str, total: int,
description: str) -> Dict[str, Any]:
path = "/v3/pay/transactions/native"
body = {
"appid": self.app_id,
"mchid": self.mch_id,
"description": description,
"out_trade_no": out_trade_no,
"notify_url": self.notify_url,
"amount": {"total": total, "currency": "CNY"},
}
return await self._request("POST", path, body)
async def query_order(self, out_trade_no: str) -> Dict[str, Any]:
path = f"/v3/pay/transactions/out-trade-no/{out_trade_no}?mchid={self.mch_id}"
return await self._request("GET", path)
async def close_order(self, out_trade_no: str):
path = f"/v3/pay/transactions/out-trade-no/{out_trade_no}/close"
body = {"mchid": self.mch_id}
await self._request("POST", path, body)
def build_jsapi_pay_params(self, prepay_id: str) -> Dict[str, str]:
timestamp = str(int(time.time()))
nonce = uuid.uuid4().hex[:16]
package = f"prepay_id={prepay_id}"
sign_str = f"{self.app_id}\n{timestamp}\n{nonce}\n{package}\n"
pay_sign = self._sign_rsa(sign_str)
return {
"appId": self.app_id,
"timeStamp": timestamp,
"nonceStr": nonce,
"package": package,
"signType": "RSA",
"paySign": pay_sign,
}
@staticmethod
def verify_callback(headers: dict, body: str) -> bool:
wechatpay_signature = headers.get("wechatpay-signature", "")
wechatpay_timestamp = headers.get("wechatpay-timestamp", "")
wechatpay_nonce = headers.get("wechatpay-nonce", "")
wechatpay_serial = headers.get("wechatpay-serial", "")
if not all([wechatpay_signature, wechatpay_timestamp, wechatpay_nonce, wechatpay_serial]):
logger.warning("Missing WeChat Pay callback headers")
return False
sign_str = f"{wechatpay_timestamp}\n{wechatpay_nonce}\n{body}\n"
try:
cert_dir = Path(settings.WECHAT_PAY_CERT_DIR)
cert_path = cert_dir / "pub_key.pem"
if not cert_path.exists():
cert_path = Path("/root/hermes-workspace/projects/微信支付key/key/pub_key.pem")
with open(cert_path, "rb") as f:
cert_data = f.read()
public_key = serialization.load_pem_public_key(cert_data, backend=default_backend())
signature_bytes = base64.b64decode(wechatpay_signature)
public_key.verify(
signature_bytes,
sign_str.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
return True
except Exception as e:
logger.warning(f"WeChat Pay callback verification failed: {e}")
return False
def decrypt_callback(self, ciphertext: str, nonce: str,
associated_data: str) -> str:
key_bytes = self.api_key.encode("utf-8")
nonce_bytes = base64.b64decode(nonce) if nonce else b""
associated_bytes = associated_data.encode("utf-8")
ciphertext_bytes = base64.b64decode(ciphertext)
aesgcm = AESGCM(key_bytes)
plaintext = aesgcm.decrypt(nonce_bytes, ciphertext_bytes, associated_bytes)
return plaintext.decode("utf-8")