Files
TradeMate Dev 9e9c7ac270 fix: additional code quality and performance improvements
Code quality:
- Remove empty except blocks with proper logging
- Create shared pagination utility function
- Remove duplicate UUID validation code
- Fix dead code in translation.py

Performance:
- Fix N+1 query in followup engine (use join instead of loop)
- Add eager loading for customer health scores
- Create database indexes for common query patterns:
  - customers: (user_id, status), (user_id, last_contact_at)
  - payment_transactions: (user_id, created_at)
  - followup_logs: (user_id, customer_id)
  - notifications: (user_id, is_read)

Configuration:
- Centralize magic numbers in config.py:
  - Payment prices
  - File upload limits
  - Rate limiting settings
  - Pagination defaults
- Update auth.py to use centralized rate limiting config
- Update customer/product imports to use centralized upload limits
- Update import_service.py to use centralized MAX_ROWS
2026-06-11 18:25:08 +08:00

125 lines
4.2 KiB
Python

from typing import Dict, Any, List, Optional, Tuple
import csv
import io
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
try:
import openpyxl
HAS_OPENPYXL = True
except ImportError:
HAS_OPENPYXL = False
logger.warning("openpyxl not installed, XLSX import disabled")
REQUIRED_COLUMNS = {"name"}
OPTIONAL_COLUMNS = {
"company", "country", "phone", "email", "whatsapp_id",
"source", "tags", "notes", "status", "estimated_value",
}
from app.config import settings
class ImportService:
MAX_ROWS = settings.MAX_EXCEL_ROWS
@staticmethod
def parse_xlsx(file_bytes: bytes) -> Tuple[List[Dict[str, Any]], List[str]]:
if not HAS_OPENPYXL:
return [], ["openpyxl not installed"]
try:
# Validate magic bytes for XLSX
if len(file_bytes) < 4 or file_bytes[:4] != b'PK\x03\x04':
return [], ["Invalid XLSX file format"]
wb = openpyxl.load_workbook(io.BytesIO(file_bytes), read_only=True, data_only=True)
ws = wb.active
rows = list(ws.iter_rows(values_only=True))
if not rows:
return [], ["Empty file"]
if len(rows) > ImportService.MAX_ROWS + 1:
return [], [f"File too large. Max {ImportService.MAX_ROWS} data rows"]
headers = [str(h).strip().lower() if h else "" for h in rows[0]]
missing = REQUIRED_COLUMNS - set(headers)
if missing:
return [], [f"Missing required columns: {', '.join(missing)}"]
records = []
errors = []
for i, row in enumerate(rows[1:], 2):
if all(v is None or str(v).strip() == "" for v in row):
continue
record = {}
for j, val in enumerate(row):
if j < len(headers) and headers[j]:
record[headers[j]] = str(val).strip() if val is not None else ""
if not record.get("name"):
errors.append(f"Row {i}: missing name")
continue
records.append(record)
return records, errors
except Exception as e:
return [], [f"Parse error: {str(e)}"]
@staticmethod
def parse_csv(file_bytes: bytes) -> Tuple[List[Dict[str, Any]], List[str]]:
try:
text = file_bytes.decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(text))
if not reader.fieldnames:
return [], ["Empty or invalid CSV"]
headers = [h.strip().lower() for h in reader.fieldnames]
missing = REQUIRED_COLUMNS - set(headers)
if missing:
return [], [f"Missing required columns: {', '.join(missing)}"]
records = []
errors = []
for i, row in enumerate(reader, 2):
cleaned = {}
for k, v in row.items():
key = k.strip().lower()
if key:
cleaned[key] = v.strip() if v else ""
if not cleaned.get("name"):
errors.append(f"Row {i}: missing name")
continue
cleaned = {k: v for k, v in cleaned.items() if k in REQUIRED_COLUMNS | OPTIONAL_COLUMNS}
records.append(cleaned)
return records, errors
except Exception as e:
return [], [f"Parse error: {str(e)}"]
@staticmethod
def validate_records(records: List[Dict]) -> Tuple[List[Dict], List[str]]:
valid = []
errors = []
for i, r in enumerate(records, 1):
if r.get("status") and r["status"] not in ("lead", "negotiating", "customer", "lost", "archived"):
errors.append(f"Row {i}: invalid status '{r['status']}'")
continue
if r.get("phone") and not r["phone"].strip():
r.pop("phone", None)
r.setdefault("status", "lead")
r.setdefault("source", "import")
r.setdefault("tags", [])
if isinstance(r.get("tags"), str):
r["tags"] = [t.strip() for t in r["tags"].split(",") if t.strip()]
valid.append(r)
return valid, errors
import_service = ImportService()