7b62c2f8b4
## H5 底部导航修复 (Bug #10) - 精简 App.vue,移除重复 tabbar,仅保留全局样式 - uni-page 设置 height: calc(100% - 50px) + overflow-y: auto - 内容区域精确停在底部导航上方,独立滚动不再叠加 - 恢复 custom-tab-bar 组件 ## 项目进度文档 - PROGRESS.md 更新至 10 个 Bug 修复 - 新增 H5 底部导航修复记录 - 新增历史变更条目
113 lines
3.9 KiB
Python
113 lines
3.9 KiB
Python
from typing import Dict, Any, List, Optional, Tuple
|
|
import csv
|
|
import io
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import openpyxl
|
|
HAS_OPENPYXL = True
|
|
except ImportError:
|
|
HAS_OPENPYXL = False
|
|
logger.warning("openpyxl not installed, XLSX import disabled")
|
|
|
|
|
|
REQUIRED_COLUMNS = {"name"}
|
|
OPTIONAL_COLUMNS = {
|
|
"company", "country", "phone", "email", "whatsapp_id",
|
|
"source", "tags", "notes", "status", "estimated_value",
|
|
}
|
|
|
|
|
|
class ImportService:
|
|
@staticmethod
|
|
def parse_xlsx(file_bytes: bytes) -> Tuple[List[Dict[str, Any]], List[str]]:
|
|
if not HAS_OPENPYXL:
|
|
return [], ["openpyxl not installed"]
|
|
|
|
try:
|
|
wb = openpyxl.load_workbook(io.BytesIO(file_bytes), read_only=True)
|
|
ws = wb.active
|
|
rows = list(ws.iter_rows(values_only=True))
|
|
if not rows:
|
|
return [], ["Empty file"]
|
|
|
|
headers = [str(h).strip().lower() if h else "" for h in rows[0]]
|
|
missing = REQUIRED_COLUMNS - set(headers)
|
|
if missing:
|
|
return [], [f"Missing required columns: {', '.join(missing)}"]
|
|
|
|
records = []
|
|
errors = []
|
|
for i, row in enumerate(rows[1:], 2):
|
|
if all(v is None or str(v).strip() == "" for v in row):
|
|
continue
|
|
record = {}
|
|
for j, val in enumerate(row):
|
|
if j < len(headers) and headers[j]:
|
|
record[headers[j]] = str(val).strip() if val is not None else ""
|
|
if not record.get("name"):
|
|
errors.append(f"Row {i}: missing name")
|
|
continue
|
|
records.append(record)
|
|
|
|
return records, errors
|
|
|
|
except Exception as e:
|
|
return [], [f"Parse error: {str(e)}"]
|
|
|
|
@staticmethod
|
|
def parse_csv(file_bytes: bytes) -> Tuple[List[Dict[str, Any]], List[str]]:
|
|
try:
|
|
text = file_bytes.decode("utf-8-sig")
|
|
reader = csv.DictReader(io.StringIO(text))
|
|
if not reader.fieldnames:
|
|
return [], ["Empty or invalid CSV"]
|
|
|
|
headers = [h.strip().lower() for h in reader.fieldnames]
|
|
missing = REQUIRED_COLUMNS - set(headers)
|
|
if missing:
|
|
return [], [f"Missing required columns: {', '.join(missing)}"]
|
|
|
|
records = []
|
|
errors = []
|
|
for i, row in enumerate(reader, 2):
|
|
cleaned = {}
|
|
for k, v in row.items():
|
|
key = k.strip().lower()
|
|
if key:
|
|
cleaned[key] = v.strip() if v else ""
|
|
if not cleaned.get("name"):
|
|
errors.append(f"Row {i}: missing name")
|
|
continue
|
|
cleaned = {k: v for k, v in cleaned.items() if k in REQUIRED_COLUMNS | OPTIONAL_COLUMNS}
|
|
records.append(cleaned)
|
|
|
|
return records, errors
|
|
|
|
except Exception as e:
|
|
return [], [f"Parse error: {str(e)}"]
|
|
|
|
@staticmethod
|
|
def validate_records(records: List[Dict]) -> Tuple[List[Dict], List[str]]:
|
|
valid = []
|
|
errors = []
|
|
for i, r in enumerate(records, 1):
|
|
if r.get("status") and r["status"] not in ("lead", "negotiating", "customer", "lost", "archived"):
|
|
errors.append(f"Row {i}: invalid status '{r['status']}'")
|
|
continue
|
|
if r.get("phone") and not r["phone"].strip():
|
|
r.pop("phone", None)
|
|
r.setdefault("status", "lead")
|
|
r.setdefault("source", "import")
|
|
r.setdefault("tags", [])
|
|
if isinstance(r.get("tags"), str):
|
|
r["tags"] = [t.strip() for t in r["tags"].split(",") if t.strip()]
|
|
valid.append(r)
|
|
return valid, errors
|
|
|
|
|
|
import_service = ImportService()
|