feat: WeChat Pay integration, translation quota management, login UX fixes
- WeChat Pay APIv3 integration (JSAPI + Native) with cert-based auth - TranslationQuota model + admin management UI (配额 tab) - Alibaba MT provider now checks quota before translation - Fix: admin tabs scrollable on mobile, remove header-card - Fix: profile/login navigation - logout stays on profile, login returns to profile - Fix: login form now visible by default (no extra click to show) - Fix: home page translate link uses navigateTo (was switchTab to non-tabBar page) - Add .coverage and apiclient_key.pem to gitignore
This commit is contained in:
@@ -0,0 +1,181 @@
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
import uuid
|
||||
import base64
|
||||
from typing import Optional, Dict, Any
|
||||
from pathlib import Path
|
||||
import httpx
|
||||
from cryptography.hazmat.primitives import serialization, hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WeChatPayService:
|
||||
def __init__(self):
|
||||
self.mch_id = settings.WECHAT_PAY_MCH_ID
|
||||
self.api_key = settings.WECHAT_PAY_API_KEY
|
||||
self.serial_no = settings.WECHAT_PAY_SERIAL_NO
|
||||
self.app_id = settings.WECHAT_APP_ID
|
||||
self.api_base = settings.WECHAT_PAY_API_BASE
|
||||
self.notify_url = settings.WECHAT_PAY_NOTIFY_URL
|
||||
self._private_key = None
|
||||
|
||||
def _load_private_key(self) -> bytes:
|
||||
if self._private_key:
|
||||
return self._private_key
|
||||
cert_dir = Path(settings.WECHAT_PAY_CERT_DIR)
|
||||
key_path = cert_dir / "apiclient_key.pem"
|
||||
if not key_path.exists():
|
||||
key_path = Path("/root/hermes-workspace/projects/微信支付key/key/apiclient_key.pem")
|
||||
with open(key_path, "rb") as f:
|
||||
self._private_key = f.read()
|
||||
return self._private_key
|
||||
|
||||
def _sign_rsa(self, sign_str: str) -> str:
|
||||
private_key_data = self._load_private_key()
|
||||
key = serialization.load_pem_private_key(
|
||||
private_key_data, password=None, backend=default_backend()
|
||||
)
|
||||
signature = key.sign(
|
||||
sign_str.encode("utf-8"),
|
||||
padding.PKCS1v15(),
|
||||
hashes.SHA256(),
|
||||
)
|
||||
return base64.b64encode(signature).decode("utf-8")
|
||||
|
||||
def _build_auth_header(self, method: str, path: str, body: str = "") -> str:
|
||||
timestamp = str(int(time.time()))
|
||||
nonce = uuid.uuid4().hex[:16]
|
||||
sign_str = f"{method}\n{path}\n{timestamp}\n{nonce}\n{body}\n"
|
||||
signature = self._sign_rsa(sign_str)
|
||||
return (
|
||||
f'WECHATPAY2-SHA256-RSA2048 '
|
||||
f'mchid="{self.mch_id}",'
|
||||
f'nonce_str="{nonce}",'
|
||||
f'timestamp="{timestamp}",'
|
||||
f'serial_no="{self.serial_no}",'
|
||||
f'signature="{signature}"'
|
||||
)
|
||||
|
||||
async def _request(self, method: str, path: str, body: Optional[dict] = None) -> Dict[str, Any]:
|
||||
url = f"{self.api_base}{path}"
|
||||
body_str = json.dumps(body, ensure_ascii=False, separators=(",", ":")) if body else ""
|
||||
auth = self._build_auth_header(method, path, body_str)
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.request(
|
||||
method=method,
|
||||
url=url,
|
||||
content=body_str if body else None,
|
||||
headers={
|
||||
"Authorization": auth,
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"User-Agent": "TradeMate/1.0",
|
||||
},
|
||||
)
|
||||
data = resp.json() if resp.text else {}
|
||||
if resp.status_code >= 400:
|
||||
logger.error(f"WeChat Pay API error: {resp.status_code} {data}")
|
||||
raise Exception(f"WeChat Pay error: {data.get('message', resp.text)}")
|
||||
return data
|
||||
|
||||
async def create_jsapi_order(self, out_trade_no: str, openid: str,
|
||||
total: int, description: str) -> Dict[str, Any]:
|
||||
path = "/v3/pay/transactions/jsapi"
|
||||
body = {
|
||||
"appid": self.app_id,
|
||||
"mchid": self.mch_id,
|
||||
"description": description,
|
||||
"out_trade_no": out_trade_no,
|
||||
"notify_url": self.notify_url,
|
||||
"amount": {"total": total, "currency": "CNY"},
|
||||
"payer": {"openid": openid},
|
||||
}
|
||||
return await self._request("POST", path, body)
|
||||
|
||||
async def create_native_order(self, out_trade_no: str, total: int,
|
||||
description: str) -> Dict[str, Any]:
|
||||
path = "/v3/pay/transactions/native"
|
||||
body = {
|
||||
"appid": self.app_id,
|
||||
"mchid": self.mch_id,
|
||||
"description": description,
|
||||
"out_trade_no": out_trade_no,
|
||||
"notify_url": self.notify_url,
|
||||
"amount": {"total": total, "currency": "CNY"},
|
||||
}
|
||||
return await self._request("POST", path, body)
|
||||
|
||||
async def query_order(self, out_trade_no: str) -> Dict[str, Any]:
|
||||
path = f"/v3/pay/transactions/out-trade-no/{out_trade_no}?mchid={self.mch_id}"
|
||||
return await self._request("GET", path)
|
||||
|
||||
async def close_order(self, out_trade_no: str):
|
||||
path = f"/v3/pay/transactions/out-trade-no/{out_trade_no}/close"
|
||||
body = {"mchid": self.mch_id}
|
||||
await self._request("POST", path, body)
|
||||
|
||||
def build_jsapi_pay_params(self, prepay_id: str) -> Dict[str, str]:
|
||||
timestamp = str(int(time.time()))
|
||||
nonce = uuid.uuid4().hex[:16]
|
||||
package = f"prepay_id={prepay_id}"
|
||||
sign_str = f"{self.app_id}\n{timestamp}\n{nonce}\n{package}\n"
|
||||
pay_sign = self._sign_rsa(sign_str)
|
||||
|
||||
return {
|
||||
"appId": self.app_id,
|
||||
"timeStamp": timestamp,
|
||||
"nonceStr": nonce,
|
||||
"package": package,
|
||||
"signType": "RSA",
|
||||
"paySign": pay_sign,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def verify_callback(headers: dict, body: str) -> bool:
|
||||
wechatpay_signature = headers.get("wechatpay-signature", "")
|
||||
wechatpay_timestamp = headers.get("wechatpay-timestamp", "")
|
||||
wechatpay_nonce = headers.get("wechatpay-nonce", "")
|
||||
wechatpay_serial = headers.get("wechatpay-serial", "")
|
||||
|
||||
if not all([wechatpay_signature, wechatpay_timestamp, wechatpay_nonce, wechatpay_serial]):
|
||||
logger.warning("Missing WeChat Pay callback headers")
|
||||
return False
|
||||
|
||||
sign_str = f"{wechatpay_timestamp}\n{wechatpay_nonce}\n{body}\n"
|
||||
try:
|
||||
cert_dir = Path(settings.WECHAT_PAY_CERT_DIR)
|
||||
cert_path = cert_dir / "pub_key.pem"
|
||||
if not cert_path.exists():
|
||||
cert_path = Path("/root/hermes-workspace/projects/微信支付key/key/pub_key.pem")
|
||||
with open(cert_path, "rb") as f:
|
||||
cert_data = f.read()
|
||||
public_key = serialization.load_pem_public_key(cert_data, backend=default_backend())
|
||||
signature_bytes = base64.b64decode(wechatpay_signature)
|
||||
public_key.verify(
|
||||
signature_bytes,
|
||||
sign_str.encode("utf-8"),
|
||||
padding.PKCS1v15(),
|
||||
hashes.SHA256(),
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"WeChat Pay callback verification failed: {e}")
|
||||
return False
|
||||
|
||||
def decrypt_callback(self, ciphertext: str, nonce: str,
|
||||
associated_data: str) -> str:
|
||||
key_bytes = self.api_key.encode("utf-8")
|
||||
nonce_bytes = base64.b64decode(nonce) if nonce else b""
|
||||
associated_bytes = associated_data.encode("utf-8")
|
||||
ciphertext_bytes = base64.b64decode(ciphertext)
|
||||
|
||||
aesgcm = AESGCM(key_bytes)
|
||||
plaintext = aesgcm.decrypt(nonce_bytes, ciphertext_bytes, associated_bytes)
|
||||
return plaintext.decode("utf-8")
|
||||
Reference in New Issue
Block a user