from typing import Dict, Any, Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_ from app.models.user import Product from datetime import datetime import logging logger = logging.getLogger(__name__) class ProductService: def __init__(self, db: AsyncSession): self.db = db async def list_products(self, user_id: str, category: Optional[str] = None, page: int = 1, size: int = 20) -> Dict[str, Any]: query = select(Product).where(Product.user_id == user_id, Product.is_active == True) count_query = select(func.count()).select_from(Product).where(Product.user_id == user_id, Product.is_active == True) if category: query = query.where(Product.category == category) count_query = count_query.where(Product.category == category) query = query.order_by(Product.updated_at.desc()).offset((page - 1) * size).limit(size) total = await self.db.execute(count_query) result = await self.db.execute(query) products = result.scalars().all() return { "items": [self._to_dict(p) for p in products], "total": total.scalar(), "page": page, "size": size, } async def get_product(self, user_id: str, product_id: str) -> Optional[Dict]: result = await self.db.execute( select(Product).where( and_(Product.id == product_id, Product.user_id == user_id) ) ) p = result.scalar_one_or_none() return self._to_dict(p) if p else None async def create_product(self, user_id: str, data: Dict[str, Any]) -> Dict: p = Product(user_id=user_id, **data) self.db.add(p) await self.db.flush() return self._to_dict(p) async def update_product(self, user_id: str, product_id: str, data: Dict[str, Any]) -> Optional[Dict]: result = await self.db.execute( select(Product).where( and_(Product.id == product_id, Product.user_id == user_id) ) ) p = result.scalar_one_or_none() if not p: return None for k, v in data.items(): if v is not None and hasattr(p, k): setattr(p, k, v) await self.db.flush() return self._to_dict(p) async def delete_product(self, user_id: str, product_id: str) -> bool: result = await self.db.execute( select(Product).where( and_(Product.id == product_id, Product.user_id == user_id) ) ) p = result.scalar_one_or_none() if not p: return False p.is_active = False await self.db.flush() return True def _to_dict(self, p: Product) -> Dict: if not p: return {} return { "id": str(p.id), "name": p.name, "name_en": p.name_en, "description": p.description, "description_en": p.description_en, "category": p.category, "price": p.price, "price_unit": p.price_unit, "moq": p.moq, "keywords": p.keywords or [], "specifications": p.specifications or {}, "images": p.images or [], "is_active": p.is_active, "created_at": p.created_at.isoformat() if p.created_at else None, "updated_at": p.updated_at.isoformat() if p.updated_at else None, }