Files
trade-assistant/backend/app/services/paypal_pay.py
T

276 lines
11 KiB
Python

import json
import logging
import hashlib
import hmac
import base64
import httpx
from typing import Optional, Dict, Any
from datetime import datetime
from app.config import settings
from app.services.payment_gateway import PaymentGateway
logger = logging.getLogger(__name__)
PAYPAL_API_BASE = "https://api-m.paypal.com"
PAYPAL_API_SANDBOX = "https://api-m.sandbox.paypal.com"
class PayPalPaymentService(PaymentGateway):
name = "paypal"
supported_types = ["paypal", "card"]
def __init__(self):
self.client_id = settings.PAYPAL_CLIENT_ID or ""
self.client_secret = settings.PAYPAL_CLIENT_SECRET or ""
self.webhook_id = settings.PAYPAL_WEBHOOK_ID or ""
self.sandbox = settings.PAYPAL_SANDBOX
self._base_url = PAYPAL_API_SANDBOX if self.sandbox else PAYPAL_API_BASE
self._access_token = None
self._token_expires = 0
def _is_configured(self) -> bool:
return bool(self.client_id and self.client_secret)
async def _get_access_token(self) -> str:
if self._access_token and datetime.utcnow().timestamp() < self._token_expires - 60:
return self._access_token
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self._base_url}/v1/oauth2/token",
auth=(self.client_id, self.client_secret),
data={"grant_type": "client_credentials"},
headers={"Accept": "application/json"},
)
if resp.status_code != 200:
raise ValueError(f"PayPal OAuth failed: {resp.text}")
data = resp.json()
self._access_token = data["access_token"]
self._token_expires = datetime.utcnow().timestamp() + data.get("expires_in", 32400)
return self._access_token
async def create_order(self, order_no: str, amount: int, description: str,
**kwargs) -> Dict[str, Any]:
if not self._is_configured():
raise ValueError("PayPal 未配置")
token = await self._get_access_token()
pay_type = kwargs.get("pay_type", "paypal")
success_url = kwargs.get("success_url", "https://trade.yuzhiran.com/workspace/credits?paypal=success")
cancel_url = kwargs.get("cancel_url", "https://trade.yuzhiran.com/workspace/credits?paypal=cancel")
usd_amount = round(amount / 100, 2)
payload = {
"intent": "CAPTURE",
"purchase_units": [{
"reference_id": order_no,
"description": description[:127],
"amount": {
"currency_code": "USD",
"value": str(usd_amount),
"breakdown": {
"item_total": {
"currency_code": "USD",
"value": str(usd_amount)
}
}
},
"items": [{
"name": description[:127],
"unit_amount": {
"currency_code": "USD",
"value": str(usd_amount)
},
"quantity": "1",
"category": "DIGITAL_GOODS"
}]
}],
"payment_source": {
"paypal": {
"experience_context": {
"payment_method_preference": "IMMEDIATE_PAYMENT_REQUIRED",
"landing_page": "LOGIN",
"user_action": "PAY_NOW",
"return_url": success_url,
"cancel_url": cancel_url,
}
}
}
}
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self._base_url}/v2/checkout/orders",
json=payload,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"PayPal-Request-Id": order_no,
},
)
if resp.status_code not in (200, 201):
raise ValueError(f"PayPal create order failed: {resp.text}")
data = resp.json()
approval_url = ""
for link in data.get("links", []):
if link["rel"] == "payer-action":
approval_url = link["href"]
break
return {
"gateway_order_id": data["id"],
"merchant_order_id": order_no,
"session_url": approval_url,
"session_id": data["id"],
"amount": usd_amount,
"status": data["status"],
}
async def query_order(self, order_no: str) -> Dict[str, Any]:
if not self._is_configured():
return {"status": "unknown"}
token = await self._get_access_token()
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self._base_url}/v2/checkout/orders/{order_no}",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
)
if resp.status_code != 200:
return {"status": "unknown"}
data = resp.json()
return {
"status": data.get("status", "unknown"),
"payment_status": "completed" if data.get("status") == "COMPLETED" else data.get("status", ""),
"amount": float(data.get("purchase_units", [{}])[0].get("amount", {}).get("value", 0)),
"currency": data.get("purchase_units", [{}])[0].get("amount", {}).get("currency_code", "USD"),
}
async def refund(self, order_no: str, amount: int, reason: str = "") -> Dict[str, Any]:
token = await self._get_access_token()
usd_amount = round(amount / 100, 2)
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self._base_url}/v2/payments/captures/{order_no}/refund",
json={
"amount": {"currency_code": "USD", "value": str(usd_amount)},
"note_to_payer": reason or "Refund",
},
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
)
if resp.status_code not in (200, 201):
raise ValueError(f"PayPal refund failed: {resp.text}")
data = resp.json()
return {"status": data.get("status", "COMPLETED"), "refund_id": data.get("id", "")}
async def query_refund(self, order_no: str) -> Dict[str, Any]:
return {"status": "not_implemented"}
def verify_callback(self, headers: dict, body: str) -> bool:
if not self.webhook_id:
logger.warning("PayPal webhook ID not configured")
return False
transmission_id = headers.get("paypal-transmission-id", "")
transmission_time = headers.get("paypal-transmission-time", "")
cert_url = headers.get("paypal-cert-url", "")
actual_sig = headers.get("paypal-transmission-sig", "")
auth_algo = headers.get("paypal-auth-algo", "")
if not all([transmission_id, transmission_time, cert_url, actual_sig, auth_algo]):
logger.warning("PayPal webhook missing required headers")
return False
try:
token_resp = httpx.post(
f"{self._base_url}/v1/oauth2/token",
auth=(self.client_id, self.client_secret),
data={"grant_type": "client_credentials"},
headers={"Accept": "application/json"},
timeout=10,
)
if token_resp.status_code != 200:
return False
token = token_resp.json()["access_token"]
resp = httpx.post(
f"{self._base_url}/v1/notifications/verify-webhook-signature",
json={
"auth_algo": auth_algo,
"cert_url": cert_url,
"transmission_id": transmission_id,
"transmission_sig": actual_sig,
"transmission_time": transmission_time,
"webhook_id": self.webhook_id,
"webhook_event": json.loads(body),
},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
timeout=10,
)
result = resp.json()
return result.get("verification_status") == "SUCCESS"
except Exception as e:
logger.error(f"PayPal webhook verification failed: {e}")
return False
def parse_callback(self, body: str, headers: dict) -> Dict[str, Any]:
event = json.loads(body)
event_type = event.get("event_type", "")
resource = event.get("resource", {})
order_id = ""
amount = 0
if resource:
order_id = resource.get("id", "")
amt_field = resource.get("amount", {})
if isinstance(amt_field, dict) and (amt_field.get("value") or amt_field.get("total")):
amount = float(amt_field.get("value", amt_field.get("total", 0)))
else:
units = resource.get("purchase_units", [])
amount = float(units[0]["amount"]["value"]) if units and units[0].get("amount") else 0
custom_id = ""
purchase_units = resource.get("purchase_units", [])
if purchase_units:
custom_id = purchase_units[0].get("reference_id", "")
return {
"event": event_type,
"order_no": custom_id,
"gateway_order_id": order_id,
"gateway_order_no": resource.get("payments", {}).get("captures", [{}])[0].get("id", order_id) if resource else order_id,
"amount": amount,
"success": event_type in ("CHECKOUT.ORDER.APPROVED", "PAYMENT.CAPTURE.COMPLETED", "CHECKOUT.ORDER.COMPLETED"),
"raw": resource,
}
async def capture_order(self, order_id: str) -> Dict[str, Any]:
token = await self._get_access_token()
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self._base_url}/v2/checkout/orders/{order_id}/capture",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
)
if resp.status_code not in (200, 201):
raise ValueError(f"PayPal capture failed: {resp.text}")
data = resp.json()
status = data.get("status", "")
capture_id = ""
for pu in data.get("purchase_units", []):
for cap in pu.get("payments", {}).get("captures", []):
if cap.get("status") == "COMPLETED":
capture_id = cap["id"]
break
return {
"status": status,
"capture_id": capture_id,
"completed": status == "COMPLETED",
}
async def close_order(self, order_no: str) -> Dict[str, Any]:
return {"status": "ok"}