Files
trade-assistant/backend/app/services/import_service.py
T
TradeMate Dev 13e3992d4c fix: security and code quality improvements
Security fixes:
- Add file upload size limits (10MB) for customer and product imports
- Add XLSX file validation with row limits and magic byte checking
- Implement password validation (min 6 chars) in registration
- Add rate limiting for guest login (5 per IP per 15 minutes)
- Sanitize error messages to prevent information leakage
- Fix XSS vulnerability by removing unsafe v-html usage
- Enforce WhatsApp webhook signature verification
- Add SSRF protection with URL validation and IP blocking
- Fix marketing endpoints to use proper authentication

Code quality improvements:
- Create shared utility functions for UUID validation and string sanitization
- Remove duplicate UUID validation code from admin modules
- Remove dead code (pass statement in translation.py)
- Fix aliyun SDK import compatibility
2026-06-11 17:54:07 +08:00

122 lines
4.2 KiB
Python

from typing import Dict, Any, List, Optional, Tuple
import csv
import io
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
try:
import openpyxl
HAS_OPENPYXL = True
except ImportError:
HAS_OPENPYXL = False
logger.warning("openpyxl not installed, XLSX import disabled")
REQUIRED_COLUMNS = {"name"}
OPTIONAL_COLUMNS = {
"company", "country", "phone", "email", "whatsapp_id",
"source", "tags", "notes", "status", "estimated_value",
}
class ImportService:
MAX_ROWS = 10000
@staticmethod
def parse_xlsx(file_bytes: bytes) -> Tuple[List[Dict[str, Any]], List[str]]:
if not HAS_OPENPYXL:
return [], ["openpyxl not installed"]
try:
# Validate magic bytes for XLSX
if len(file_bytes) < 4 or file_bytes[:4] != b'PK\x03\x04':
return [], ["Invalid XLSX file format"]
wb = openpyxl.load_workbook(io.BytesIO(file_bytes), read_only=True, data_only=True)
ws = wb.active
rows = list(ws.iter_rows(values_only=True))
if not rows:
return [], ["Empty file"]
if len(rows) > ImportService.MAX_ROWS + 1:
return [], [f"File too large. Max {ImportService.MAX_ROWS} data rows"]
headers = [str(h).strip().lower() if h else "" for h in rows[0]]
missing = REQUIRED_COLUMNS - set(headers)
if missing:
return [], [f"Missing required columns: {', '.join(missing)}"]
records = []
errors = []
for i, row in enumerate(rows[1:], 2):
if all(v is None or str(v).strip() == "" for v in row):
continue
record = {}
for j, val in enumerate(row):
if j < len(headers) and headers[j]:
record[headers[j]] = str(val).strip() if val is not None else ""
if not record.get("name"):
errors.append(f"Row {i}: missing name")
continue
records.append(record)
return records, errors
except Exception as e:
return [], [f"Parse error: {str(e)}"]
@staticmethod
def parse_csv(file_bytes: bytes) -> Tuple[List[Dict[str, Any]], List[str]]:
try:
text = file_bytes.decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(text))
if not reader.fieldnames:
return [], ["Empty or invalid CSV"]
headers = [h.strip().lower() for h in reader.fieldnames]
missing = REQUIRED_COLUMNS - set(headers)
if missing:
return [], [f"Missing required columns: {', '.join(missing)}"]
records = []
errors = []
for i, row in enumerate(reader, 2):
cleaned = {}
for k, v in row.items():
key = k.strip().lower()
if key:
cleaned[key] = v.strip() if v else ""
if not cleaned.get("name"):
errors.append(f"Row {i}: missing name")
continue
cleaned = {k: v for k, v in cleaned.items() if k in REQUIRED_COLUMNS | OPTIONAL_COLUMNS}
records.append(cleaned)
return records, errors
except Exception as e:
return [], [f"Parse error: {str(e)}"]
@staticmethod
def validate_records(records: List[Dict]) -> Tuple[List[Dict], List[str]]:
valid = []
errors = []
for i, r in enumerate(records, 1):
if r.get("status") and r["status"] not in ("lead", "negotiating", "customer", "lost", "archived"):
errors.append(f"Row {i}: invalid status '{r['status']}'")
continue
if r.get("phone") and not r["phone"].strip():
r.pop("phone", None)
r.setdefault("status", "lead")
r.setdefault("source", "import")
r.setdefault("tags", [])
if isinstance(r.get("tags"), str):
r["tags"] = [t.strip() for t in r["tags"].split(",") if t.strip()]
valid.append(r)
return valid, errors
import_service = ImportService()