feat: 修复 H5 底部导航覆盖 + 更新项目进度文档

## H5 底部导航修复 (Bug #10)
- 精简 App.vue,移除重复 tabbar,仅保留全局样式
- uni-page 设置 height: calc(100% - 50px) + overflow-y: auto
- 内容区域精确停在底部导航上方,独立滚动不再叠加
- 恢复 custom-tab-bar 组件

## 项目进度文档
- PROGRESS.md 更新至 10 个 Bug 修复
- 新增 H5 底部导航修复记录
- 新增历史变更条目
This commit is contained in:
TradeMate Dev
2026-05-12 20:24:42 +08:00
parent 69e164dcae
commit 7b62c2f8b4
125 changed files with 19725 additions and 728 deletions
+2 -2
View File
@@ -17,8 +17,8 @@ class TestConfig:
assert "translate" in settings.AI_ROUTING
assert "reply" in settings.AI_ROUTING
assert "marketing" in settings.AI_ROUTING
assert settings.AI_ROUTING["translate"]["primary"] == "deepl"
assert settings.AI_ROUTING["reply"]["primary"] == "openai"
assert "extract" in settings.AI_ROUTING
assert "primary" in settings.AI_ROUTING["translate"]
def test_free_tier_limits(self):
assert settings.FREE_DAILY_TRANSLATE_CHARS == 5000
+273
View File
@@ -0,0 +1,273 @@
import pytest
from datetime import datetime, timedelta
from app.services.customer_health import CustomerHealthService
class TestSilenceScore:
def test_no_contact_returns_max_days(self):
assert CustomerHealthService.silence_days(None) == 999
def test_recent_contact_returns_0_days(self):
now = datetime.utcnow()
assert CustomerHealthService.silence_days(now) == 0
def test_3_days_ago(self):
dt = datetime.utcnow() - timedelta(days=3)
assert CustomerHealthService.silence_days(dt) == 3
def test_silence_score_0_days(self):
assert CustomerHealthService.calculate_silence_score(datetime.utcnow()) == 100
def test_silence_score_7_days(self):
dt = datetime.utcnow() - timedelta(days=7)
score = CustomerHealthService.calculate_silence_score(dt)
assert score == 50
def test_silence_score_14_days(self):
dt = datetime.utcnow() - timedelta(days=14)
score = CustomerHealthService.calculate_silence_score(dt)
assert score == 0
def test_silence_score_21_days_clamped(self):
dt = datetime.utcnow() - timedelta(days=21)
score = CustomerHealthService.calculate_silence_score(dt)
assert score == 0
class TestStatusWeight:
def test_customer_status(self):
assert CustomerHealthService.status_weight("customer") == 100
def test_negotiating_status(self):
assert CustomerHealthService.status_weight("negotiating") == 70
def test_lead_status(self):
assert CustomerHealthService.status_weight("lead") == 40
def test_lost_status(self):
assert CustomerHealthService.status_weight("lost") == 10
def test_unknown_status_defaults(self):
assert CustomerHealthService.status_weight("unknown") == 40
def test_none_status_defaults(self):
assert CustomerHealthService.status_weight(None) == 40
class TestGrade:
def test_active_grade(self):
assert CustomerHealthService.grade(100) == "active"
assert CustomerHealthService.grade(80) == "active"
assert CustomerHealthService.grade(85) == "active"
def test_watch_grade(self):
assert CustomerHealthService.grade(79) == "watch"
assert CustomerHealthService.grade(50) == "watch"
assert CustomerHealthService.grade(65) == "watch"
def test_critical_grade(self):
assert CustomerHealthService.grade(49) == "critical"
assert CustomerHealthService.grade(0) == "critical"
assert CustomerHealthService.grade(30) == "critical"
class TestResponseScore:
def test_both_none(self):
r = CustomerHealthService.calc_response_score(None, None)
assert r["score"] == 50
assert r["trend"] == "stable"
def test_only_recent_exists(self):
r = CustomerHealthService.calc_response_score(2.0, None)
assert r["score"] == 90
assert r["trend"] == "stable"
def test_improving_faster_response(self):
r = CustomerHealthService.calc_response_score(2.0, 10.0)
assert r["score"] == 90
assert r["trend"] == "improving"
def test_declining_slower_response(self):
r = CustomerHealthService.calc_response_score(10.0, 2.0)
assert r["trend"] == "declining"
def test_fast_response_high_score(self):
r = CustomerHealthService.calc_response_score(0.5, 5.0)
assert r["score"] >= 95
assert r["trend"] == "improving"
def test_very_slow_response_low_score(self):
r = CustomerHealthService.calc_response_score(48.0, 2.0)
assert r["score"] == 0
assert r["trend"] == "declining"
class TestSentimentScore:
def test_empty_messages_neutral(self):
r = CustomerHealthService.calc_sentiment_score([])
assert r["score"] == 50
assert r["label"] == "neutral"
def test_positive_message(self):
r = CustomerHealthService.calc_sentiment_score(["yes I'm interested thanks"])
assert r["score"] == 80
assert r["label"] == "positive"
def test_negative_message(self):
r = CustomerHealthService.calc_sentiment_score(["no not interested too expensive"])
assert r["score"] == 20
assert r["label"] == "negative"
def test_mixed_messages_neutral(self):
r = CustomerHealthService.calc_sentiment_score(["good quality", "but price is high"])
assert r["score"] == 50
assert r["label"] == "neutral"
def test_more_positive_than_negative(self):
r = CustomerHealthService.calc_sentiment_score([
"great product",
"yes please proceed",
"but shipping is expensive",
])
assert r["score"] == 80
assert r["label"] == "positive"
class TestInquiryDepthScore:
def test_empty_messages(self):
r = CustomerHealthService.calc_inquiry_depth_score([])
assert r["score"] == 0
assert r["signal_count"] == 0
def test_no_signals(self):
r = CustomerHealthService.calc_inquiry_depth_score(["hello", "how are you"])
assert r["score"] == 0
def test_one_signal(self):
r = CustomerHealthService.calc_inquiry_depth_score(["what is your price"])
assert r["score"] == 50
assert r["signal_count"] >= 1
def test_multiple_signals(self):
r = CustomerHealthService.calc_inquiry_depth_score([
"what is your MOQ and FOB price",
"do you have certification",
"what is the lead time",
])
assert r["score"] >= 75
assert r["signal_count"] >= 3
def test_deduplicates_signals(self):
r = CustomerHealthService.calc_inquiry_depth_score([
"what is the price",
"please send price and MOQ",
])
assert r["signal_count"] == 2
class TestBusinessValueScore:
def test_zero_value(self):
r = CustomerHealthService.calc_business_value_score(0)
assert r["score"] == 0
def test_small_value(self):
r = CustomerHealthService.calc_business_value_score(500)
assert r["score"] == 20
def test_medium_value(self):
r = CustomerHealthService.calc_business_value_score(5000)
assert r["score"] == 40
def test_large_value(self):
r = CustomerHealthService.calc_business_value_score(50000)
assert r["score"] == 80
def test_very_large_value(self):
r = CustomerHealthService.calc_business_value_score(200000)
assert r["score"] == 100
class TestTotalScore:
def test_perfect_health(self):
dims = {
"response_trend": {"score": 100},
"sentiment": {"score": 100},
"inquiry_depth": {"score": 100},
"silence": {"score": 100},
"business_value": {"score": 100},
}
r = CustomerHealthService.calc_total_score(dims)
assert r["total_score"] == 100
assert r["grade"] == "active"
def test_zero_health(self):
dims = {
"response_trend": {"score": 0},
"sentiment": {"score": 0},
"inquiry_depth": {"score": 0},
"silence": {"score": 0},
"business_value": {"score": 0},
}
r = CustomerHealthService.calc_total_score(dims)
assert r["total_score"] == 0
assert r["grade"] == "critical"
def test_mid_health(self):
dims = {
"response_trend": {"score": 60},
"sentiment": {"score": 50},
"inquiry_depth": {"score": 50},
"silence": {"score": 40},
"business_value": {"score": 50},
}
r = CustomerHealthService.calc_total_score(dims)
assert 45 <= r["total_score"] <= 55
class TestSuggestion:
def test_active_suggestion(self):
s = CustomerHealthService.suggestion("active", 1, "lead")
assert "良好" in s
def test_watch_with_silence(self):
s = CustomerHealthService.suggestion("watch", 5, "lead")
assert "5天" in s
assert "跟进" in s
def test_watch_no_silence(self):
s = CustomerHealthService.suggestion("watch", 1, "lead")
assert "关注" in s
def test_critical_lead(self):
s = CustomerHealthService.suggestion("critical", 10, "lead")
assert "10天" in s
assert "跟进" in s
def test_critical_lost(self):
s = CustomerHealthService.suggestion("critical", 20, "lost")
assert "重新激活" in s
class TestHealthOverview:
def test_overview_empty(self):
overview = CustomerHealthService._calculate_overview_static([])
assert overview["total"] == 0
assert overview["active"] == 0
assert overview["watch"] == 0
assert overview["critical"] == 0
def test_overview_mixed(self, monkeypatch):
class Row:
def __init__(self, status, days_ago):
self.status = status
self.last_contact_at = datetime.utcnow() - timedelta(days=days_ago)
rows = [
Row("customer", 1),
Row("lead", 7),
Row("negotiating", 14),
Row("lost", 30),
]
overview = CustomerHealthService._calculate_overview_static(rows)
assert overview["total"] == 4
assert overview["active"] == 1
+95
View File
@@ -0,0 +1,95 @@
import pytest
from httpx import AsyncClient
from app.core.security import create_access_token
from app.models.user import User
import uuid
class TestAdminAPI:
async def test_admin_dashboard_unauthorized(self, client: AsyncClient):
response = await client.get("/api/v1/admin/dashboard")
assert response.status_code == 401
async def test_admin_dashboard_forbidden_non_admin(self, client: AsyncClient, test_user):
token = create_access_token({"sub": str(test_user.id), "tier": "free", "role": "user"})
response = await client.get(
"/api/v1/admin/dashboard",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 403
async def test_admin_dashboard_success(self, client: AsyncClient, test_user):
test_user.role = "admin"
token = create_access_token({"sub": str(test_user.id), "tier": "free", "role": "admin"})
response = await client.get(
"/api/v1/admin/dashboard",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
assert "total_users" in data
assert "paid_users" in data
async def test_admin_list_users(self, client: AsyncClient, test_user):
test_user.role = "admin"
token = create_access_token({"sub": str(test_user.id), "tier": "free", "role": "admin"})
response = await client.get(
"/api/v1/admin/users",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "total" in data
async def test_admin_update_tier_forbidden_non_admin(self, client: AsyncClient, test_user):
target_id = str(uuid.uuid4())
token = create_access_token({"sub": str(test_user.id), "tier": "free", "role": "user"})
response = await client.patch(
f"/api/v1/admin/users/{target_id}/tier",
headers={"Authorization": f"Bearer {token}"},
json={"tier": "pro"},
)
assert response.status_code == 403
class TestRateLimit:
async def test_health_not_rate_limited(self, client: AsyncClient):
for _ in range(10):
response = await client.get("/health")
assert response.status_code == 200
async def test_rate_limit_headers_present(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/customers", headers=auth_headers)
assert "X-RateLimit-Remaining" in response.headers
assert "X-RateLimit-Limit" in response.headers
class TestUserRole:
async def test_user_default_role(self, client: AsyncClient, test_user):
assert test_user.role == "user"
async def test_user_info_contains_role(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/auth/me", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "role" in data
assert data["role"] == "user"
class TestPrivacyTerms:
async def test_privacy_page_exists(self):
import os
path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"uni-app", "src", "pages", "agreement", "privacy.vue",
)
assert os.path.exists(path), "privacy.vue not found"
async def test_terms_page_exists(self):
import os
path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"uni-app", "src", "pages", "agreement", "terms.vue",
)
assert os.path.exists(path), "terms.vue not found"
+337
View File
@@ -0,0 +1,337 @@
import pytest
from httpx import AsyncClient
from unittest.mock import patch, AsyncMock
from app.models.customer import Conversation, Message
from app.models.quotation import Quotation, QuotationItem
from app.models.user import Product
from datetime import datetime
class TestTranslateAPI:
async def test_translate_unauthorized(self, client: AsyncClient):
response = await client.post(
"/api/v1/translate",
json={"text": "Hello", "target_lang": "zh"},
)
assert response.status_code == 401
async def test_translate_success(self, client: AsyncClient, auth_headers):
with patch("app.services.translation.TranslationService.translate") as mock:
mock.return_value = {
"translated_text": "你好",
"source_lang": "en",
"provider_used": "mock",
"from_cache": False,
}
response = await client.post(
"/api/v1/translate",
headers=auth_headers,
json={"text": "Hello", "target_lang": "zh"},
)
assert response.status_code == 200
data = response.json()
assert data["translated_text"] == "你好"
async def test_translate_with_context(self, client: AsyncClient, auth_headers):
with patch("app.services.translation.TranslationService.translate") as mock:
mock.return_value = {
"translated_text": "FOB 上海 价格",
"source_lang": "en",
"provider_used": "mock",
}
response = await client.post(
"/api/v1/translate",
headers=auth_headers,
json={"text": "FOB Shanghai price", "target_lang": "zh", "context": "trade"},
)
assert response.status_code == 200
class TestReplyAPI:
async def test_reply_unauthorized(self, client: AsyncClient):
response = await client.post(
"/api/v1/translate/reply",
json={"inquiry": "How much?", "tone": "professional"},
)
assert response.status_code == 401
async def test_reply_success(self, client: AsyncClient, auth_headers):
with patch("app.services.translation.TranslationService.generate_reply") as mock:
mock.return_value = [
{"reply": "Thank you for your inquiry.", "tone": "professional", "provider": "mock"},
{"reply": "Thanks for reaching out!", "tone": "friendly", "provider": "mock"},
]
response = await client.post(
"/api/v1/translate/reply",
headers=auth_headers,
json={"inquiry": "How much for 500 units?", "tone": "professional", "count": 2},
)
assert response.status_code == 200
data = response.json()
assert len(data["suggestions"]) == 2
assert data["count"] == 2
async def test_reply_with_context(self, client: AsyncClient, auth_headers):
with patch("app.services.translation.TranslationService.generate_reply") as mock:
mock.return_value = [{"reply": "Our price is $10/unit.", "tone": "professional", "provider": "mock"}]
response = await client.post(
"/api/v1/translate/reply",
headers=auth_headers,
json={
"inquiry": "Price?",
"tone": "professional",
"count": 1,
"context": {"product": "Widget X", "price": "$10"},
},
)
assert response.status_code == 200
class TestExtractAPI:
async def test_extract_unauthorized(self, client: AsyncClient):
response = await client.post(
"/api/v1/translate/extract",
json={"text": "I want 500pcs of red widgets FOB Shanghai", "extract_type": "inquiry"},
)
assert response.status_code == 401
async def test_extract_success(self, client: AsyncClient, auth_headers):
with patch("app.services.translation.TranslationService.extract_info") as mock:
mock.return_value = {
"intent": "purchase",
"product_interest": "widgets",
"quantity": "500",
}
response = await client.post(
"/api/v1/translate/extract",
headers=auth_headers,
json={"text": "I want 500pcs of red widgets FOB Shanghai", "extract_type": "inquiry"},
)
assert response.status_code == 200
data = response.json()
assert "extracted" in data
class TestTTSAPI:
async def test_tts_get_unauthorized(self, client: AsyncClient):
response = await client.get("/api/v1/translate/tts?text=hello&lang=en")
assert response.status_code == 401
async def test_tts_get_empty_text(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/translate/tts?text=&lang=en", headers=auth_headers)
assert response.status_code == 400
class TestMarketingAPI:
async def test_marketing_unauthorized(self, client: AsyncClient):
response = await client.post(
"/api/v1/marketing/generate",
json={"product_name": "Widget", "description": "A great widget", "target": "US buyers"},
)
assert response.status_code == 401
async def test_marketing_success(self, client: AsyncClient, auth_headers):
with patch("app.services.marketing.MarketingService.generate") as mock:
mock.return_value = [
{"content": "Buy our widget!", "style": "professional", "provider": "mock"},
]
response = await client.post(
"/api/v1/marketing/generate",
headers=auth_headers,
json={
"product_name": "Widget X",
"description": "High quality widget",
"category": "tools",
"target": "US importers",
"style": "professional",
"count": 1,
},
)
assert response.status_code == 200
data = response.json()
assert data["count"] >= 1
assert "results" in data
async def test_marketing_keywords(self, client: AsyncClient, auth_headers):
with patch("app.services.marketing.MarketingService.generate_keywords") as mock:
mock.return_value = ["widget", "tool", "quality"]
response = await client.post(
"/api/v1/marketing/keywords",
headers=auth_headers,
json={"product_name": "Widget", "description": "A widget", "count": 5},
)
assert response.status_code == 200
assert "keywords" in response.json()
class TestProductAPI:
async def test_create_product(self, client: AsyncClient, auth_headers):
response = await client.post(
"/api/v1/products",
headers=auth_headers,
json={
"name": "Test Product",
"description": "A test product",
"category": "electronics",
"price": "10.50",
"price_unit": "USD",
},
)
assert response.status_code == 200
data = response.json()
assert data["name"] == "Test Product"
async def test_list_products(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/products", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "items" in data
async def test_update_product(self, client: AsyncClient, auth_headers, db_session, test_user):
product = Product(
user_id=test_user.id,
name="Old Name",
category="tools",
is_active=True,
)
db_session.add(product)
await db_session.commit()
response = await client.patch(
f"/api/v1/products/{product.id}",
headers=auth_headers,
json={"name": "New Name", "price": "20.00"},
)
assert response.status_code == 200
assert response.json()["name"] == "New Name"
async def test_delete_product(self, client: AsyncClient, auth_headers, db_session, test_user):
product = Product(user_id=test_user.id, name="To Delete")
db_session.add(product)
await db_session.commit()
pid = product.id
response = await client.delete(f"/api/v1/products/{pid}", headers=auth_headers)
assert response.status_code == 200
response = await client.get(f"/api/v1/products/{pid}", headers=auth_headers)
assert response.status_code == 404
class TestQuotationAPI:
async def test_create_quotation(self, client: AsyncClient, auth_headers, db_session, test_user):
from app.models.customer import Customer
customer = Customer(user_id=test_user.id, name="Test Buyer")
db_session.add(customer)
await db_session.commit()
response = await client.post(
"/api/v1/quotations",
headers=auth_headers,
json={
"customer_id": str(customer.id),
"title": "Test Quote",
"items": [
{"product_name": "Widget", "quantity": 100, "unit_price": 10.0},
],
"currency": "USD",
"payment_terms": "T/T",
},
)
assert response.status_code == 200
data = response.json()
assert data["title"] == "Test Quote"
assert len(data["items"]) == 1
async def test_list_quotations(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/quotations", headers=auth_headers)
assert response.status_code == 200
assert "items" in response.json()
async def test_quotation_pdf_not_found(self, client: AsyncClient, auth_headers):
import uuid
response = await client.get(f"/api/v1/quotations/{uuid.uuid4()}/pdf", headers=auth_headers)
assert response.status_code == 404
async def test_quotation_status_update(self, client: AsyncClient, auth_headers, db_session, test_user):
from app.models.customer import Customer
customer = Customer(user_id=test_user.id, name="Status Test Buyer")
db_session.add(customer)
await db_session.commit()
q = Quotation(user_id=test_user.id, customer_id=customer.id, title="Status Test", status="draft")
db_session.add(q)
await db_session.commit()
response = await client.patch(
f"/api/v1/quotations/{q.id}/status",
headers=auth_headers,
json={"status": "sent"},
)
assert response.status_code == 200
assert response.json()["status"] == "sent"
class TestAnalyticsAPI:
async def test_overview(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/analytics/overview", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "customers" in data
assert "translations" in data
assert "quotations" in data
assert "messages" in data
assert "marketing" in data
async def test_customer_analytics(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/analytics/customers", headers=auth_headers)
assert response.status_code == 200
async def test_marketing_analytics(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/analytics/marketing", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "total_events" in data
class TestOnboardingAPI:
async def test_onboarding_status(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/onboarding/status", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "completed" in data
assert "product_count" in data
async def test_onboarding_create_product(self, client: AsyncClient, auth_headers):
with patch("app.services.onboarding.OnboardingService.create_product") as mock:
mock.return_value = {
"id": "mock-id",
"name": "Onboarded Product",
"marketing_contents": [],
"keywords": [],
}
response = await client.post(
"/api/v1/onboarding/product",
headers=auth_headers,
json={
"name": "New Product",
"description": "Desc",
"category": "tools",
"target": "US buyers",
},
)
assert response.status_code == 200
assert response.json()["name"] == "Onboarded Product"
class TestExportAPI:
async def test_export_customers_csv(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/customers/export/csv", headers=auth_headers)
assert response.status_code == 200
assert response.headers["content-type"] == "text/csv"
assert "customers.csv" in response.headers["content-disposition"]
async def test_export_quotations_csv(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/quotations/export/csv", headers=auth_headers)
assert response.status_code == 200
assert response.headers["content-type"] == "text/csv"
+156
View File
@@ -0,0 +1,156 @@
import pytest
from httpx import AsyncClient
from app.models.notification import Notification
from app.models.feedback import Feedback
class TestNotificationAPI:
async def test_list_notifications_empty(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/notifications", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "items" in data
assert data["items"] == []
async def test_unread_count_zero(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/notifications/unread-count", headers=auth_headers)
assert response.status_code == 200
assert response.json()["count"] == 0
async def test_create_and_list_notification(self, client: AsyncClient, auth_headers, db_session, test_user):
n = Notification(
user_id=test_user.id,
title="Test Title",
content="Test Content",
)
db_session.add(n)
await db_session.commit()
response = await client.get("/api/v1/notifications", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data["items"]) >= 1
assert data["items"][0]["title"] == "Test Title"
async def test_mark_read(self, client: AsyncClient, auth_headers, db_session, test_user):
n = Notification(user_id=test_user.id, title="Read Test", content="Content")
db_session.add(n)
await db_session.commit()
response = await client.patch(
f"/api/v1/notifications/{n.id}/read",
headers=auth_headers,
)
assert response.status_code == 200
count_resp = await client.get("/api/v1/notifications/unread-count", headers=auth_headers)
assert count_resp.json()["count"] == 0
async def test_mark_all_read(self, client: AsyncClient, auth_headers, db_session, test_user):
for i in range(3):
db_session.add(Notification(user_id=test_user.id, title=f"Notif {i}", content="Content"))
await db_session.commit()
response = await client.post("/api/v1/notifications/read-all", headers=auth_headers)
assert response.status_code == 200
assert response.json()["count"] == 3
async def test_delete_notification(self, client: AsyncClient, auth_headers, db_session, test_user):
n = Notification(user_id=test_user.id, title="Delete Me", content="Content")
db_session.add(n)
await db_session.commit()
nid = n.id
response = await client.delete(f"/api/v1/notifications/{nid}", headers=auth_headers)
assert response.status_code == 200
list_resp = await client.get("/api/v1/notifications", headers=auth_headers)
ids = [item["id"] for item in list_resp.json()["items"]]
assert str(nid) not in ids
async def test_delete_not_found(self, client: AsyncClient, auth_headers):
import uuid
response = await client.delete(
f"/api/v1/notifications/{uuid.uuid4()}",
headers=auth_headers,
)
assert response.status_code == 404
async def test_unread_count_after_read(self, client: AsyncClient, auth_headers, db_session, test_user):
for i in range(2):
db_session.add(Notification(user_id=test_user.id, title=f"Unread {i}", content="C"))
await db_session.commit()
resp = await client.get("/api/v1/notifications/unread-count", headers=auth_headers)
assert resp.json()["count"] == 2
async def test_unread_only_filter(self, client: AsyncClient, auth_headers, db_session, test_user):
n1 = Notification(user_id=test_user.id, title="Read", content="C", is_read=True)
n2 = Notification(user_id=test_user.id, title="Unread", content="C")
db_session.add_all([n1, n2])
await db_session.commit()
response = await client.get(
"/api/v1/notifications?unread_only=true",
headers=auth_headers,
)
assert response.status_code == 200
for item in response.json()["items"]:
assert item["is_read"] is False
class TestFeedbackAPI:
async def test_submit_feedback(self, client: AsyncClient, auth_headers):
response = await client.post(
"/api/v1/feedback",
headers=auth_headers,
json={
"content": "Great app!",
"category": "feature",
"contact": "test@example.com",
},
)
assert response.status_code == 200
assert response.json()["status"] == "ok"
async def test_submit_feedback_minimal(self, client: AsyncClient, auth_headers):
response = await client.post(
"/api/v1/feedback",
headers=auth_headers,
json={"content": "Bug report"},
)
assert response.status_code == 200
assert response.json()["status"] == "ok"
async def test_submit_feedback_unauthorized(self, client: AsyncClient):
response = await client.post(
"/api/v1/feedback",
json={"content": "Test"},
)
assert response.status_code == 401
class TestPaymentAPI:
async def test_get_plans(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/payment/plans", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "plans" in data
assert len(data["plans"]) >= 3
async def test_get_subscription_free(self, client: AsyncClient, auth_headers):
response = await client.get("/api/v1/payment/subscription", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "plan" in data
assert "status" in data
async def test_create_order(self, client: AsyncClient, auth_headers):
response = await client.post(
"/api/v1/payment/create-order",
headers=auth_headers,
json={"plan": "pro"},
)
assert response.status_code == 200
data = response.json()
assert "prepay_id" in data or "order_id" in data or "url" in data
+9
View File
@@ -0,0 +1,9 @@
import pytest
from httpx import AsyncClient
from unittest.mock import patch, AsyncMock
from app.models.customer import Conversation, Message, Customer
from app.models.preference import PreferenceAnalysis, MarketingEffect
from app.models.user import Product
from datetime import datetime
+139
View File
@@ -0,0 +1,139 @@
import pytest
from unittest.mock import patch, AsyncMock
from app.services.corpus_trainer import CorpusTrainer
from app.models.corpus import CorpusEntry
from datetime import datetime
class TestCorpusTrainer:
async def test_get_stats_empty(self, db_session):
trainer = CorpusTrainer(db_session)
stats = await trainer.get_stats()
assert stats["total_entries"] == 0
assert stats["with_embeddings"] == 0
async def test_get_stats_with_data(self, db_session):
entries = [
CorpusEntry(source_text="Hello", target_text="你好", task_type="translate", quality_score=0.8),
CorpusEntry(source_text="Goodbye", target_text="再见", task_type="translate", quality_score=0.6),
]
for e in entries:
db_session.add(e)
await db_session.commit()
trainer = CorpusTrainer(db_session)
stats = await trainer.get_stats()
assert stats["total_entries"] == 2
assert stats["by_task_type"]["translate"] == 2
assert stats["high_quality"] == 1
assert stats["low_quality"] == 0
async def test_score_entries(self, db_session):
entries = [
CorpusEntry(source_text="Hello world", target_text="你好世界", task_type="translate"),
CorpusEntry(source_text="Hi", target_text="", task_type="translate"),
]
for e in entries:
db_session.add(e)
await db_session.commit()
trainer = CorpusTrainer(db_session)
result = await trainer.score_entries(batch_size=10)
assert result["processed"] == 2
assert result["updated"] == 2
for e in entries:
await db_session.refresh(e)
assert e.quality_score is not None
assert 0.0 <= e.quality_score <= 1.0
async def test_deduplicate(self, db_session):
from datetime import datetime
e1 = CorpusEntry(
source_text="Duplicate text", target_text="重复文本",
task_type="translate", quality_score=0.8,
created_at=datetime.utcnow(),
)
e2 = CorpusEntry(
source_text="Duplicate text", target_text="重复文本",
task_type="translate", quality_score=0.7,
created_at=datetime.utcnow(),
)
db_session.add_all([e1, e2])
await db_session.commit()
trainer = CorpusTrainer(db_session)
result = await trainer.deduplicate()
assert result["duplicates_removed"] == 1
stats = await trainer.get_stats()
assert stats["total_entries"] == 1
async def test_prune_low_quality(self, db_session):
from datetime import timedelta
old = datetime.utcnow() - timedelta(days=100)
entry = CorpusEntry(
source_text="x", target_text="y",
task_type="translate", quality_score=0.1,
created_at=old, usage_count=0,
)
db_session.add(entry)
await db_session.commit()
trainer = CorpusTrainer(db_session)
result = await trainer.prune_low_quality(min_score=0.2, max_age_days=30)
assert result["pruned"] == 1
stats = await trainer.get_stats()
assert stats["total_entries"] == 0
async def test_run_pipeline(self, db_session):
trainer = CorpusTrainer(db_session)
result = await trainer.run_pipeline()
assert "deduplication" in result
assert "scoring" in result
assert "embeddings" in result
assert "pruning" in result
assert "stats" in result
def test_calculate_quality_score_with_rating(self, db_session):
trainer = CorpusTrainer(db_session)
entry = CorpusEntry(
source_text="Good source text with enough length",
target_text="Good target text with enough length",
task_type="translate",
user_rating=4,
)
score = trainer._calculate_quality_score(entry)
assert 0.7 <= score <= 1.0
def test_calculate_quality_score_short_text(self, db_session):
trainer = CorpusTrainer(db_session)
entry = CorpusEntry(
source_text="ab", target_text="cd",
task_type="translate",
)
score = trainer._calculate_quality_score(entry)
assert score < 0.5
def test_calculate_quality_score_with_usage(self, db_session):
trainer = CorpusTrainer(db_session)
entry = CorpusEntry(
source_text="Good source text here with proper length",
target_text="Good target text here with proper length",
task_type="translate",
usage_count=10,
)
score = trainer._calculate_quality_score(entry)
assert score >= 0.6
async def test_embedding_generation_skipped_without_key(self, db_session):
from app.config import settings
original = settings.OPENAI_API_KEY
settings.OPENAI_API_KEY = None
trainer = CorpusTrainer(db_session)
embedding = await trainer._generate_embedding("test")
assert embedding is None
settings.OPENAI_API_KEY = original