import json import time import logging import uuid import base64 from typing import Optional, Dict, Any from pathlib import Path import httpx from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.backends import default_backend from app.config import settings logger = logging.getLogger(__name__) class WeChatPayService: def __init__(self): self.mch_id = settings.WECHAT_PAY_MCH_ID self.api_key = settings.WECHAT_PAY_API_KEY self.serial_no = settings.WECHAT_PAY_SERIAL_NO self.app_id = settings.WECHAT_APP_ID self.api_base = settings.WECHAT_PAY_API_BASE self.notify_url = settings.WECHAT_PAY_NOTIFY_URL self._private_key = None def _load_private_key(self) -> bytes: if self._private_key: return self._private_key cert_dir = Path(settings.WECHAT_PAY_CERT_DIR) key_path = cert_dir / "apiclient_key.pem" if not key_path.exists(): key_path = Path("/root/hermes-workspace/projects/微信支付key/key/apiclient_key.pem") with open(key_path, "rb") as f: self._private_key = f.read() return self._private_key def _sign_rsa(self, sign_str: str) -> str: private_key_data = self._load_private_key() key = serialization.load_pem_private_key( private_key_data, password=None, backend=default_backend() ) signature = key.sign( sign_str.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256(), ) return base64.b64encode(signature).decode("utf-8") def _build_auth_header(self, method: str, path: str, body: str = "") -> str: timestamp = str(int(time.time())) nonce = uuid.uuid4().hex[:16] sign_str = f"{method}\n{path}\n{timestamp}\n{nonce}\n{body}\n" signature = self._sign_rsa(sign_str) return ( f'WECHATPAY2-SHA256-RSA2048 ' f'mchid="{self.mch_id}",' f'nonce_str="{nonce}",' f'timestamp="{timestamp}",' f'serial_no="{self.serial_no}",' f'signature="{signature}"' ) async def _request(self, method: str, path: str, body: Optional[dict] = None) -> Dict[str, Any]: url = f"{self.api_base}{path}" body_str = json.dumps(body, ensure_ascii=False, separators=(",", ":")) if body else "" auth = self._build_auth_header(method, path, body_str) async with httpx.AsyncClient() as client: resp = await client.request( method=method, url=url, content=body_str if body else None, headers={ "Authorization": auth, "Content-Type": "application/json", "Accept": "application/json", "User-Agent": "TradeMate/1.0", }, ) data = resp.json() if resp.text else {} if resp.status_code >= 400: logger.error(f"WeChat Pay API error: {resp.status_code} {data}") raise Exception(f"WeChat Pay error: {data.get('message', resp.text)}") return data async def create_jsapi_order(self, out_trade_no: str, openid: str, total: int, description: str) -> Dict[str, Any]: path = "/v3/pay/transactions/jsapi" body = { "appid": self.app_id, "mchid": self.mch_id, "description": description, "out_trade_no": out_trade_no, "notify_url": self.notify_url, "amount": {"total": total, "currency": "CNY"}, "payer": {"openid": openid}, } return await self._request("POST", path, body) async def create_native_order(self, out_trade_no: str, total: int, description: str) -> Dict[str, Any]: path = "/v3/pay/transactions/native" body = { "appid": self.app_id, "mchid": self.mch_id, "description": description, "out_trade_no": out_trade_no, "notify_url": self.notify_url, "amount": {"total": total, "currency": "CNY"}, } return await self._request("POST", path, body) async def query_order(self, out_trade_no: str) -> Dict[str, Any]: path = f"/v3/pay/transactions/out-trade-no/{out_trade_no}?mchid={self.mch_id}" return await self._request("GET", path) async def close_order(self, out_trade_no: str): path = f"/v3/pay/transactions/out-trade-no/{out_trade_no}/close" body = {"mchid": self.mch_id} await self._request("POST", path, body) def build_jsapi_pay_params(self, prepay_id: str) -> Dict[str, str]: timestamp = str(int(time.time())) nonce = uuid.uuid4().hex[:16] package = f"prepay_id={prepay_id}" sign_str = f"{self.app_id}\n{timestamp}\n{nonce}\n{package}\n" pay_sign = self._sign_rsa(sign_str) return { "appId": self.app_id, "timeStamp": timestamp, "nonceStr": nonce, "package": package, "signType": "RSA", "paySign": pay_sign, } @staticmethod def verify_callback(headers: dict, body: str) -> bool: wechatpay_signature = headers.get("wechatpay-signature", "") wechatpay_timestamp = headers.get("wechatpay-timestamp", "") wechatpay_nonce = headers.get("wechatpay-nonce", "") wechatpay_serial = headers.get("wechatpay-serial", "") if not all([wechatpay_signature, wechatpay_timestamp, wechatpay_nonce, wechatpay_serial]): logger.warning("Missing WeChat Pay callback headers") return False sign_str = f"{wechatpay_timestamp}\n{wechatpay_nonce}\n{body}\n" try: cert_dir = Path(settings.WECHAT_PAY_CERT_DIR) cert_path = cert_dir / "pub_key.pem" if not cert_path.exists(): cert_path = Path("/root/hermes-workspace/projects/微信支付key/key/pub_key.pem") with open(cert_path, "rb") as f: cert_data = f.read() public_key = serialization.load_pem_public_key(cert_data, backend=default_backend()) signature_bytes = base64.b64decode(wechatpay_signature) public_key.verify( signature_bytes, sign_str.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256(), ) return True except Exception as e: logger.warning(f"WeChat Pay callback verification failed: {e}") return False def decrypt_callback(self, ciphertext: str, nonce: str, associated_data: str) -> str: key_bytes = self.api_key.encode("utf-8") nonce_bytes = base64.b64decode(nonce) if nonce else b"" associated_bytes = associated_data.encode("utf-8") ciphertext_bytes = base64.b64decode(ciphertext) aesgcm = AESGCM(key_bytes) plaintext = aesgcm.decrypt(nonce_bytes, ciphertext_bytes, associated_bytes) return plaintext.decode("utf-8")