c397740748
- WeChat Pay APIv3 integration (JSAPI + Native) with cert-based auth - TranslationQuota model + admin management UI (配额 tab) - Alibaba MT provider now checks quota before translation - Fix: admin tabs scrollable on mobile, remove header-card - Fix: profile/login navigation - logout stays on profile, login returns to profile - Fix: login form now visible by default (no extra click to show) - Fix: home page translate link uses navigateTo (was switchTab to non-tabBar page) - Add .coverage and apiclient_key.pem to gitignore
182 lines
7.2 KiB
Python
182 lines
7.2 KiB
Python
import json
|
|
import time
|
|
import logging
|
|
import uuid
|
|
import base64
|
|
from typing import Optional, Dict, Any
|
|
from pathlib import Path
|
|
import httpx
|
|
from cryptography.hazmat.primitives import serialization, hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
from cryptography.hazmat.backends import default_backend
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WeChatPayService:
|
|
def __init__(self):
|
|
self.mch_id = settings.WECHAT_PAY_MCH_ID
|
|
self.api_key = settings.WECHAT_PAY_API_KEY
|
|
self.serial_no = settings.WECHAT_PAY_SERIAL_NO
|
|
self.app_id = settings.WECHAT_APP_ID
|
|
self.api_base = settings.WECHAT_PAY_API_BASE
|
|
self.notify_url = settings.WECHAT_PAY_NOTIFY_URL
|
|
self._private_key = None
|
|
|
|
def _load_private_key(self) -> bytes:
|
|
if self._private_key:
|
|
return self._private_key
|
|
cert_dir = Path(settings.WECHAT_PAY_CERT_DIR)
|
|
key_path = cert_dir / "apiclient_key.pem"
|
|
if not key_path.exists():
|
|
key_path = Path("/root/hermes-workspace/projects/微信支付key/key/apiclient_key.pem")
|
|
with open(key_path, "rb") as f:
|
|
self._private_key = f.read()
|
|
return self._private_key
|
|
|
|
def _sign_rsa(self, sign_str: str) -> str:
|
|
private_key_data = self._load_private_key()
|
|
key = serialization.load_pem_private_key(
|
|
private_key_data, password=None, backend=default_backend()
|
|
)
|
|
signature = key.sign(
|
|
sign_str.encode("utf-8"),
|
|
padding.PKCS1v15(),
|
|
hashes.SHA256(),
|
|
)
|
|
return base64.b64encode(signature).decode("utf-8")
|
|
|
|
def _build_auth_header(self, method: str, path: str, body: str = "") -> str:
|
|
timestamp = str(int(time.time()))
|
|
nonce = uuid.uuid4().hex[:16]
|
|
sign_str = f"{method}\n{path}\n{timestamp}\n{nonce}\n{body}\n"
|
|
signature = self._sign_rsa(sign_str)
|
|
return (
|
|
f'WECHATPAY2-SHA256-RSA2048 '
|
|
f'mchid="{self.mch_id}",'
|
|
f'nonce_str="{nonce}",'
|
|
f'timestamp="{timestamp}",'
|
|
f'serial_no="{self.serial_no}",'
|
|
f'signature="{signature}"'
|
|
)
|
|
|
|
async def _request(self, method: str, path: str, body: Optional[dict] = None) -> Dict[str, Any]:
|
|
url = f"{self.api_base}{path}"
|
|
body_str = json.dumps(body, ensure_ascii=False, separators=(",", ":")) if body else ""
|
|
auth = self._build_auth_header(method, path, body_str)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.request(
|
|
method=method,
|
|
url=url,
|
|
content=body_str if body else None,
|
|
headers={
|
|
"Authorization": auth,
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
"User-Agent": "TradeMate/1.0",
|
|
},
|
|
)
|
|
data = resp.json() if resp.text else {}
|
|
if resp.status_code >= 400:
|
|
logger.error(f"WeChat Pay API error: {resp.status_code} {data}")
|
|
raise Exception(f"WeChat Pay error: {data.get('message', resp.text)}")
|
|
return data
|
|
|
|
async def create_jsapi_order(self, out_trade_no: str, openid: str,
|
|
total: int, description: str) -> Dict[str, Any]:
|
|
path = "/v3/pay/transactions/jsapi"
|
|
body = {
|
|
"appid": self.app_id,
|
|
"mchid": self.mch_id,
|
|
"description": description,
|
|
"out_trade_no": out_trade_no,
|
|
"notify_url": self.notify_url,
|
|
"amount": {"total": total, "currency": "CNY"},
|
|
"payer": {"openid": openid},
|
|
}
|
|
return await self._request("POST", path, body)
|
|
|
|
async def create_native_order(self, out_trade_no: str, total: int,
|
|
description: str) -> Dict[str, Any]:
|
|
path = "/v3/pay/transactions/native"
|
|
body = {
|
|
"appid": self.app_id,
|
|
"mchid": self.mch_id,
|
|
"description": description,
|
|
"out_trade_no": out_trade_no,
|
|
"notify_url": self.notify_url,
|
|
"amount": {"total": total, "currency": "CNY"},
|
|
}
|
|
return await self._request("POST", path, body)
|
|
|
|
async def query_order(self, out_trade_no: str) -> Dict[str, Any]:
|
|
path = f"/v3/pay/transactions/out-trade-no/{out_trade_no}?mchid={self.mch_id}"
|
|
return await self._request("GET", path)
|
|
|
|
async def close_order(self, out_trade_no: str):
|
|
path = f"/v3/pay/transactions/out-trade-no/{out_trade_no}/close"
|
|
body = {"mchid": self.mch_id}
|
|
await self._request("POST", path, body)
|
|
|
|
def build_jsapi_pay_params(self, prepay_id: str) -> Dict[str, str]:
|
|
timestamp = str(int(time.time()))
|
|
nonce = uuid.uuid4().hex[:16]
|
|
package = f"prepay_id={prepay_id}"
|
|
sign_str = f"{self.app_id}\n{timestamp}\n{nonce}\n{package}\n"
|
|
pay_sign = self._sign_rsa(sign_str)
|
|
|
|
return {
|
|
"appId": self.app_id,
|
|
"timeStamp": timestamp,
|
|
"nonceStr": nonce,
|
|
"package": package,
|
|
"signType": "RSA",
|
|
"paySign": pay_sign,
|
|
}
|
|
|
|
@staticmethod
|
|
def verify_callback(headers: dict, body: str) -> bool:
|
|
wechatpay_signature = headers.get("wechatpay-signature", "")
|
|
wechatpay_timestamp = headers.get("wechatpay-timestamp", "")
|
|
wechatpay_nonce = headers.get("wechatpay-nonce", "")
|
|
wechatpay_serial = headers.get("wechatpay-serial", "")
|
|
|
|
if not all([wechatpay_signature, wechatpay_timestamp, wechatpay_nonce, wechatpay_serial]):
|
|
logger.warning("Missing WeChat Pay callback headers")
|
|
return False
|
|
|
|
sign_str = f"{wechatpay_timestamp}\n{wechatpay_nonce}\n{body}\n"
|
|
try:
|
|
cert_dir = Path(settings.WECHAT_PAY_CERT_DIR)
|
|
cert_path = cert_dir / "pub_key.pem"
|
|
if not cert_path.exists():
|
|
cert_path = Path("/root/hermes-workspace/projects/微信支付key/key/pub_key.pem")
|
|
with open(cert_path, "rb") as f:
|
|
cert_data = f.read()
|
|
public_key = serialization.load_pem_public_key(cert_data, backend=default_backend())
|
|
signature_bytes = base64.b64decode(wechatpay_signature)
|
|
public_key.verify(
|
|
signature_bytes,
|
|
sign_str.encode("utf-8"),
|
|
padding.PKCS1v15(),
|
|
hashes.SHA256(),
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.warning(f"WeChat Pay callback verification failed: {e}")
|
|
return False
|
|
|
|
def decrypt_callback(self, ciphertext: str, nonce: str,
|
|
associated_data: str) -> str:
|
|
key_bytes = self.api_key.encode("utf-8")
|
|
nonce_bytes = base64.b64decode(nonce) if nonce else b""
|
|
associated_bytes = associated_data.encode("utf-8")
|
|
ciphertext_bytes = base64.b64decode(ciphertext)
|
|
|
|
aesgcm = AESGCM(key_bytes)
|
|
plaintext = aesgcm.decrypt(nonce_bytes, ciphertext_bytes, associated_bytes)
|
|
return plaintext.decode("utf-8")
|