113 lines
4.4 KiB
Python
113 lines
4.4 KiB
Python
from typing import Optional, Dict, Any
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, desc
|
|
from app.models.certification import Certification, CertType, CertStatus
|
|
from datetime import datetime
|
|
import uuid
|
|
|
|
|
|
class CertificationService:
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def submit(self, user_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
existing = await self._get_pending(user_id)
|
|
if existing:
|
|
return {"error": "已有审核中的认证申请,请勿重复提交"}
|
|
|
|
cert = Certification(
|
|
user_id=uuid.UUID(user_id),
|
|
cert_type=CertType(data["cert_type"]),
|
|
personal_name=data.get("personal_name"),
|
|
personal_id=data.get("personal_id"),
|
|
company_name=data.get("company_name"),
|
|
tax_id=data.get("tax_id"),
|
|
business_license_url=data.get("business_license_url"),
|
|
status=CertStatus.pending,
|
|
)
|
|
self.db.add(cert)
|
|
await self.db.flush()
|
|
return {"id": str(cert.id), "status": cert.status.value}
|
|
|
|
async def get_user_cert(self, user_id: str) -> Optional[Dict[str, Any]]:
|
|
result = await self.db.execute(
|
|
select(Certification)
|
|
.where(Certification.user_id == uuid.UUID(user_id))
|
|
.order_by(desc(Certification.created_at))
|
|
.limit(1)
|
|
)
|
|
cert = result.scalar_one_or_none()
|
|
if not cert:
|
|
return None
|
|
return {
|
|
"id": str(cert.id),
|
|
"cert_type": cert.cert_type.value,
|
|
"personal_name": cert.personal_name,
|
|
"personal_id": cert.personal_id,
|
|
"company_name": cert.company_name,
|
|
"tax_id": cert.tax_id,
|
|
"business_license_url": cert.business_license_url,
|
|
"status": cert.status.value,
|
|
"reject_reason": cert.reject_reason,
|
|
"created_at": cert.created_at.isoformat() if cert.created_at else None,
|
|
"updated_at": cert.updated_at.isoformat() if cert.updated_at else None,
|
|
}
|
|
|
|
async def list_all(self, page: int, size: int, status: Optional[str] = None) -> Dict[str, Any]:
|
|
query = select(Certification).order_by(desc(Certification.created_at))
|
|
if status:
|
|
query = query.where(Certification.status == CertStatus(status))
|
|
offset = (page - 1) * size
|
|
result = await self.db.execute(query.offset(offset).limit(size))
|
|
certs = result.scalars().all()
|
|
total_result = await self.db.execute(
|
|
select(Certification).where(Certification.status == CertStatus(status)) if status else select(Certification)
|
|
)
|
|
total = len(total_result.scalars().all())
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": str(c.id),
|
|
"user_id": str(c.user_id),
|
|
"cert_type": c.cert_type.value,
|
|
"personal_name": c.personal_name,
|
|
"personal_id": c.personal_id,
|
|
"company_name": c.company_name,
|
|
"tax_id": c.tax_id,
|
|
"status": c.status.value,
|
|
"reject_reason": c.reject_reason,
|
|
"created_at": c.created_at.isoformat() if c.created_at else None,
|
|
}
|
|
for c in certs
|
|
],
|
|
"total": total,
|
|
"page": page,
|
|
"size": size,
|
|
}
|
|
|
|
async def review(self, cert_id: str, action: str, reason: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
result = await self.db.execute(
|
|
select(Certification).where(Certification.id == uuid.UUID(cert_id))
|
|
)
|
|
cert = result.scalar_one_or_none()
|
|
if not cert:
|
|
return None
|
|
if action == "approve":
|
|
cert.status = CertStatus.approved
|
|
else:
|
|
cert.status = CertStatus.rejected
|
|
cert.reject_reason = reason
|
|
await self.db.flush()
|
|
return {"id": str(cert.id), "status": cert.status.value}
|
|
|
|
async def _get_pending(self, user_id: str) -> Optional[Certification]:
|
|
result = await self.db.execute(
|
|
select(Certification)
|
|
.where(
|
|
Certification.user_id == uuid.UUID(user_id),
|
|
Certification.status == CertStatus.pending,
|
|
)
|
|
.limit(1)
|
|
)
|
|
return result.scalar_one_or_none()
|