Files
TradeMate Dev 13e3992d4c fix: security and code quality improvements
Security fixes:
- Add file upload size limits (10MB) for customer and product imports
- Add XLSX file validation with row limits and magic byte checking
- Implement password validation (min 6 chars) in registration
- Add rate limiting for guest login (5 per IP per 15 minutes)
- Sanitize error messages to prevent information leakage
- Fix XSS vulnerability by removing unsafe v-html usage
- Enforce WhatsApp webhook signature verification
- Add SSRF protection with URL validation and IP blocking
- Fix marketing endpoints to use proper authentication

Code quality improvements:
- Create shared utility functions for UUID validation and string sanitization
- Remove duplicate UUID validation code from admin modules
- Remove dead code (pass statement in translation.py)
- Fix aliyun SDK import compatibility
2026-06-11 17:54:07 +08:00

189 lines
5.9 KiB
Python

from typing import Optional
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
from app.database import get_db
from app.api.v1.deps import get_current_user
from app.models.search_provider import SearchProvider
from app.services.search import SearchService
router = APIRouter()
async def require_admin(current_user: dict = Depends(get_current_user)) -> dict:
if current_user.get("role") != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return current_user
class ProviderCreate(BaseModel):
name: str
provider_type: str
api_key: Optional[str] = None
api_endpoint: Optional[str] = None
extra_config: Optional[dict] = None
priority: int = 0
enabled: bool = True
class ProviderUpdate(BaseModel):
name: Optional[str] = None
api_key: Optional[str] = None
api_endpoint: Optional[str] = None
extra_config: Optional[dict] = None
priority: Optional[int] = None
enabled: Optional[bool] = None
@router.get("/search-providers")
async def list_providers(
page: int = Query(1, ge=1),
size: int = Query(50, ge=1, le=100),
_: dict = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(SearchProvider).order_by(SearchProvider.priority).offset((page - 1) * size).limit(size)
)
providers = result.scalars().all()
total_result = await db.execute(select(SearchProvider))
total = len(total_result.scalars().all())
return {
"items": [
{
"id": str(p.id),
"name": p.name,
"provider_type": p.provider_type,
"api_key": p.api_key[:8] + "..." if p.api_key and len(p.api_key) > 8 else p.api_key,
"api_endpoint": p.api_endpoint,
"extra_config": p.extra_config,
"priority": p.priority,
"enabled": p.enabled,
"created_at": p.created_at.isoformat() if p.created_at else None,
"updated_at": p.updated_at.isoformat() if p.updated_at else None,
}
for p in providers
],
"total": total,
"page": page,
"size": size,
}
@router.post("/search-providers")
async def create_provider(
data: ProviderCreate,
_: dict = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
provider = SearchProvider(
name=data.name,
provider_type=data.provider_type,
api_key=data.api_key,
api_endpoint=data.api_endpoint,
extra_config=data.extra_config or {},
priority=data.priority,
enabled=data.enabled,
)
db.add(provider)
await db.flush()
return {
"id": str(provider.id),
"name": provider.name,
"provider_type": provider.provider_type,
"message": "Provider created",
}
@router.get("/search-providers/{provider_id}")
async def get_provider(
provider_id: str,
_: dict = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
_validate_uuid(provider_id)
result = await db.execute(select(SearchProvider).where(SearchProvider.id == provider_id))
p = result.scalar_one_or_none()
if not p:
raise HTTPException(status_code=404, detail="Provider not found")
return {
"id": str(p.id),
"name": p.name,
"provider_type": p.provider_type,
"api_key": p.api_key,
"api_endpoint": p.api_endpoint,
"extra_config": p.extra_config,
"priority": p.priority,
"enabled": p.enabled,
"created_at": p.created_at.isoformat() if p.created_at else None,
"updated_at": p.updated_at.isoformat() if p.updated_at else None,
}
@router.put("/search-providers/{provider_id}")
async def update_provider(
provider_id: str,
data: ProviderUpdate,
_: dict = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
_validate_uuid(provider_id)
result = await db.execute(select(SearchProvider).where(SearchProvider.id == provider_id))
p = result.scalar_one_or_none()
if not p:
raise HTTPException(status_code=404, detail="Provider not found")
if data.name is not None:
p.name = data.name
if data.api_key is not None:
p.api_key = data.api_key
if data.api_endpoint is not None:
p.api_endpoint = data.api_endpoint
if data.extra_config is not None:
p.extra_config = data.extra_config
if data.priority is not None:
p.priority = data.priority
if data.enabled is not None:
p.enabled = data.enabled
await db.flush()
return {"message": "Provider updated"}
@router.delete("/search-providers/{provider_id}")
async def delete_provider(
provider_id: str,
_: dict = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
_validate_uuid(provider_id)
result = await db.execute(delete(SearchProvider).where(SearchProvider.id == provider_id))
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Provider not found")
return {"message": "Provider deleted"}
@router.post("/search-providers/{provider_id}/test")
async def test_provider(
provider_id: str,
_: dict = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
_validate_uuid(provider_id)
result = await db.execute(select(SearchProvider).where(SearchProvider.id == provider_id))
p = result.scalar_one_or_none()
if not p:
raise HTTPException(status_code=404, detail="Provider not found")
try:
svc = SearchService(db)
results = await svc._search_provider(p, "test", 3)
return {"success": True, "results": results}
except Exception as e:
return {"success": False, "error": str(e)}
from app.core.utils import validate_uuid
def _validate_uuid(uuid_str: str):
validate_uuid(uuid_str)