c6206787da
项目结构: - backend/ Python FastAPI 后端 - uni-app/ uni-app跨端前端 - docs/ 设计文档 - docker-compose.yml Docker编排 - nginx/scripts/systemd 运维配置 已完成功能: - 用户认证 (JWT) - 智能翻译 + 回复建议 - 营销素材生成 - 客户管理 + 沉默检测 - 报价单管理 - 产品库管理 - 汇率换算 - 推送通知 (uni-push) - WhatsApp Webhook框架 - Celery定时任务
100 lines
3.4 KiB
Python
100 lines
3.4 KiB
Python
from typing import Dict, Any, Optional
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func, and_
|
|
from app.models.user import Product
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ProductService:
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def list_products(self, user_id: str, category: Optional[str] = None, page: int = 1, size: int = 20) -> Dict[str, Any]:
|
|
query = select(Product).where(Product.user_id == user_id, Product.is_active == True)
|
|
count_query = select(func.count()).select_from(Product).where(Product.user_id == user_id, Product.is_active == True)
|
|
|
|
if category:
|
|
query = query.where(Product.category == category)
|
|
count_query = count_query.where(Product.category == category)
|
|
|
|
query = query.order_by(Product.updated_at.desc()).offset((page - 1) * size).limit(size)
|
|
|
|
total = await self.db.execute(count_query)
|
|
result = await self.db.execute(query)
|
|
products = result.scalars().all()
|
|
|
|
return {
|
|
"items": [self._to_dict(p) for p in products],
|
|
"total": total.scalar(),
|
|
"page": page,
|
|
"size": size,
|
|
}
|
|
|
|
async def get_product(self, user_id: str, product_id: str) -> Optional[Dict]:
|
|
result = await self.db.execute(
|
|
select(Product).where(
|
|
and_(Product.id == product_id, Product.user_id == user_id)
|
|
)
|
|
)
|
|
p = result.scalar_one_or_none()
|
|
return self._to_dict(p) if p else None
|
|
|
|
async def create_product(self, user_id: str, data: Dict[str, Any]) -> Dict:
|
|
p = Product(user_id=user_id, **data)
|
|
self.db.add(p)
|
|
await self.db.flush()
|
|
return self._to_dict(p)
|
|
|
|
async def update_product(self, user_id: str, product_id: str, data: Dict[str, Any]) -> Optional[Dict]:
|
|
result = await self.db.execute(
|
|
select(Product).where(
|
|
and_(Product.id == product_id, Product.user_id == user_id)
|
|
)
|
|
)
|
|
p = result.scalar_one_or_none()
|
|
if not p:
|
|
return None
|
|
|
|
for k, v in data.items():
|
|
if v is not None and hasattr(p, k):
|
|
setattr(p, k, v)
|
|
|
|
await self.db.flush()
|
|
return self._to_dict(p)
|
|
|
|
async def delete_product(self, user_id: str, product_id: str) -> bool:
|
|
result = await self.db.execute(
|
|
select(Product).where(
|
|
and_(Product.id == product_id, Product.user_id == user_id)
|
|
)
|
|
)
|
|
p = result.scalar_one_or_none()
|
|
if not p:
|
|
return False
|
|
p.is_active = False
|
|
await self.db.flush()
|
|
return True
|
|
|
|
def _to_dict(self, p: Product) -> Dict:
|
|
if not p:
|
|
return {}
|
|
return {
|
|
"id": str(p.id),
|
|
"name": p.name,
|
|
"name_en": p.name_en,
|
|
"description": p.description,
|
|
"description_en": p.description_en,
|
|
"category": p.category,
|
|
"price": p.price,
|
|
"price_unit": p.price_unit,
|
|
"moq": p.moq,
|
|
"keywords": p.keywords or [],
|
|
"specifications": p.specifications or {},
|
|
"images": p.images or [],
|
|
"is_active": p.is_active,
|
|
"created_at": p.created_at.isoformat() if p.created_at else None,
|
|
"updated_at": p.updated_at.isoformat() if p.updated_at else None,
|
|
} |