模型在生产环境中的表现衰退,往往不是算法问题,而是数据问题。我们最初的MLOps平台在这一点上是个黑盒。特征计算和注入TiDB的流程看起来运转正常,但当线上欺诈检测模型的准确率开始无预警下滑时,我们才意识到,我们对“数据”本身的状态一无所知。我们能看到TiDB的QPS、延迟,但对关键特征的“新鲜度”、数值分布的“漂移”情况却完全盲目。问题不在于缺少监控,而在于我们监控了错误的东西。
最初的构想很简单:我们需要将数据质量指标和系统性能指标放在同一个地方。当模型服务API的P99延迟飙高时,我希望立刻能看到它依赖的某个用户特征是不是超过了5秒钟没有更新,或者某个特征的均值是不是在过去1小时内偏离了正常基线。这种关联分析如果靠事后跨系统拉日志,黄花菜都凉了。
技术选型决策很快就清晰了。我们的特征数据已经存储在TiDB中,它作为一款HTAP数据库,既能支撑线上模型服务的高并发点查(TP),又能处理我们监控代理需要的轻量级聚合分析(AP),无需引入额外的OLAP系统增加架构复杂性。而公司的可观测性平台标准是Datadog,它强大的自定义指标(Custom Metrics)和Dashboard能力,正是我们需要的。粘合剂自然是Python,它是我们整个ML算法和数据工程团队的主力语言。
我们的目标是构建一个独立的可观测性管道,它能并行于主业务流程,持续地从TiDB中抽取数据状态,计算成可观测性指标,然后推送到Datadog。
第一步:TiDB中的特征存储设计
一切的基础是数据如何被组织。在一个高并发的欺诈检测场景,特征表的设计必须优先考虑点查性能。同时,为了我们的可观测性管道,也需要为聚合查询提供便利。
我们为用户的实时行为特征设计了如下的表结构。注意这里的索引设计,PRIMARY KEY (user_id, feature_name) 是为了服务于“获取某用户的所有特征”这一高频场景。而 idx_feature_timestamp 索引则是为我们的可观测性代理量身定做的,用于高效查询某个特征的最新更新时间,这是计算新鲜度的关键。
-- feature_store.realtime_user_features table structure
CREATE TABLE IF NOT EXISTS `realtime_user_features` (
`id` BIGINT AUTO_RANDOM,
`user_id` VARCHAR(255) NOT NULL COMMENT '用户唯一标识',
`feature_name` VARCHAR(255) NOT NULL COMMENT '特征名称, e.g., last_30min_transaction_count',
`feature_value` DOUBLE NOT NULL COMMENT '特征值',
`updated_at` DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '特征更新时间戳,精确到微秒',
PRIMARY KEY (`user_id`, `feature_name`),
INDEX `idx_feature_timestamp` (`feature_name`, `updated_at` DESC)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
在真实项目中,AUTO_RANDOM 主键对于避免TiDB的写热点至关重要,特别是在高吞吐的写入场景下。
第二步:特征注入服务的改造
我们的特征注入服务是一个基于Python FastAPI构建的微服务。它接收来自上游Kafka的原始交易事件,经过实时计算(例如使用滑动窗口),然后将新特征写入TiDB。为了支撑后续的排错和分析,对这个服务的日志和异常处理做了强化。
这里的核心代码不仅是简单的数据库写入,它包含了健壮的错误处理、重试逻辑以及结构化日志。
# feature_ingestion/main.py
import os
import time
import logging
from typing import Dict, Any
import mysql.connector
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from dogstatsd import DogStatsd
# --- Configuration ---
# In a real app, use Pydantic settings or a config management system
DB_HOST = os.getenv("TIDB_HOST", "127.0.0.1")
DB_PORT = int(os.getenv("TIDB_PORT", 4000))
DB_USER = os.getenv("TIDB_USER", "root")
DB_PASSWORD = os.getenv("TIDB_PASSWORD", "")
DB_DATABASE = os.getenv("TIDB_DATABASE", "feature_store")
DATADOG_AGENT_HOST = os.getenv("DATADOG_AGENT_HOST", "127.0.0.1")
DATADOG_AGENT_PORT = int(os.getenv("DATADOG_AGENT_PORT", 8125))
# --- Logging Setup ---
# Use structured logging for better parsing in Datadog Logs
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# --- Datadog StatsD Client ---
statsd = DogStatsd(host=DATADOG_AGENT_HOST, port=DATADOG_AGENT_PORT, namespace="mlops.feature_ingestion")
# --- Database Connection Pool ---
# Using a simple connection function here. For production, use a robust pool like SQLAlchemy's.
def get_db_connection():
try:
conn = mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_DATABASE,
autocommit=False # Important for transaction control
)
return conn
except mysql.connector.Error as err:
logger.error(f"Database connection failed: {err}")
statsd.increment("db.connection.errors", tags=["error:initial_connection"])
raise
app = FastAPI()
@app.post("/v1/ingest")
async def ingest_feature(payload: Dict[str, Any]):
"""
Receives a feature to be ingested into TiDB.
Payload example:
{
"user_id": "user-123",
"feature_name": "last_1h_avg_txn_amount",
"feature_value": 150.75,
"event_timestamp": "2023-10-27T10:00:00.123456Z"
}
"""
user_id = payload.get("user_id")
feature_name = payload.get("feature_name")
feature_value = payload.get("feature_value")
if not all([user_id, feature_name, feature_value is not None]):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"message": "Missing required fields: user_id, feature_name, feature_value"}
)
# A common mistake is to not wrap DB operations in try/except/finally
conn = None
attempt = 0
max_attempts = 3
start_time = time.monotonic()
while attempt < max_attempts:
try:
conn = get_db_connection()
cursor = conn.cursor()
# Using ON DUPLICATE KEY UPDATE is atomic and efficient for this pattern
sql = """
INSERT INTO realtime_user_features (user_id, feature_name, feature_value)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE feature_value = VALUES(feature_value), updated_at = CURRENT_TIMESTAMP(6)
"""
cursor.execute(sql, (user_id, feature_name, feature_value))
conn.commit()
# --- Datadog Instrumentation ---
latency = (time.monotonic() - start_time) * 1000
statsd.timing("db.upsert.latency", latency, tags=[f"feature:{feature_name}"])
statsd.increment("ingestion.events.success", tags=[f"feature:{feature_name}"])
logger.info(f"Successfully ingested feature '{feature_name}' for user '{user_id}'")
return {"status": "success"}
except mysql.connector.Error as err:
attempt += 1
logger.warning(f"DB error on attempt {attempt}/{max_attempts}: {err}. Retrying...")
if conn:
conn.rollback() # Rollback on failure
# This tag is critical for alerting on specific error types
statsd.increment("db.operation.errors", tags=[f"error:{type(err).__name__}", "operation:upsert"])
if attempt >= max_attempts:
logger.error(f"Failed to ingest feature after {max_attempts} attempts for user '{user_id}'")
statsd.increment("ingestion.events.failed", tags=[f"feature:{feature_name}"])
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={"message": "Database operation failed permanently"}
)
time.sleep(0.5 * attempt) # Exponential backoff
finally:
if conn and conn.is_connected():
cursor.close()
conn.close()
# This part should ideally not be reached
return JSONResponse(status_code=500, content={"message": "Internal Server Error"})
第三步:构建核心的可观测性代理
这才是整个方案的灵魂。我们编写了一个独立的Python脚本,它作为一个守护进程运行。它的唯一职责就是:定期查询TiDB,计算我们关心的“数据质量”和“数据状态”指标,然后通过dogstatsd发送给Datadog。
我们将代理的职责拆分的非常清晰:
- FeatureFreshnessMonitor: 监控特征的新鲜度。
- FeatureDriftMonitor: 监控特征的统计分布漂移。
- FeatureVolumeMonitor: 监控特征的写入量。
# observability_agent/agent.py
import os
import time
import logging
import schedule
import mysql.connector
from dogstatsd import DogStatsd
from datetime import datetime, timezone
# --- Configuration ---
DB_HOST = os.getenv("TIDB_HOST", "127.0.0.1")
DB_PORT = int(os.getenv("TIDB_PORT", 4000))
DB_USER = os.getenv("TIDB_USER", "root")
DB_PASSWORD = os.getenv("TIDB_PASSWORD", "")
DB_DATABASE = os.getenv("TIDB_DATABASE", "feature_store")
DATADOG_AGENT_HOST = os.getenv("DATADOG_AGENT_HOST", "127.0.0.1")
DATADOG_AGENT_PORT = int(os.getenv("DATADOG_AGENT_PORT", 8125))
# The list of critical features we want to monitor.
# In a production system, this list would come from a configuration service or a database.
CRITICAL_FEATURES = [
"last_1h_transaction_count",
"last_1h_avg_txn_amount",
"last_24h_failed_txn_count",
]
# --- Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("ObservabilityAgent")
# --- Datadog StatsD Client ---
statsd = DogStatsd(host=DATADOG_AGENT_HOST, port=DATADOG_AGENT_PORT, namespace="mlops.feature_store")
def get_db_connection():
# It's better to have a single, long-lived connection for a daemon process
# with proper reconnection logic, but for simplicity, we reconnect each time.
try:
return mysql.connector.connect(
host=DB_HOST, port=DB_PORT, user=DB_USER,
password=DB_PASSWORD, database=DB_DATABASE
)
except mysql.connector.Error as err:
logger.error(f"Agent DB connection failed: {err}")
statsd.increment("agent.db.connection.errors")
return None
class FeatureFreshnessMonitor:
"""Calculates the staleness of features."""
def run(self, conn):
if not conn:
return
logger.info("Running FeatureFreshnessMonitor...")
try:
with conn.cursor(dictionary=True) as cursor:
# This query is optimized by the `idx_feature_timestamp` index.
# It finds the latest update time for each of our critical features.
sql = """
SELECT feature_name, MAX(updated_at) as last_update
FROM realtime_user_features
WHERE feature_name IN (%s)
GROUP BY feature_name
"""
# Create a placeholder string for the IN clause
placeholders = ', '.join(['%s'] * len(CRITICAL_FEATURES))
formatted_sql = sql % placeholders
cursor.execute(formatted_sql, tuple(CRITICAL_FEATURES))
results = cursor.fetchall()
now_utc = datetime.now(timezone.utc)
for row in results:
feature_name = row['feature_name']
# Ensure last_update is timezone-aware for correct calculation
last_update_utc = row['last_update'].replace(tzinfo=timezone.utc)
staleness_seconds = (now_utc - last_update_utc).total_seconds()
statsd.gauge(
"data.freshness.staleness_seconds",
staleness_seconds,
tags=[f"feature:{feature_name}"]
)
logger.info(f"Feature '{feature_name}' staleness: {staleness_seconds:.2f}s")
except mysql.connector.Error as err:
logger.error(f"FreshnessMonitor failed: {err}")
statsd.increment("agent.monitor.errors", tags=["monitor:freshness"])
class FeatureDriftMonitor:
"""Calculates basic statistics for drift detection."""
def run(self, conn):
if not conn:
return
logger.info("Running FeatureDriftMonitor...")
try:
with conn.cursor(dictionary=True) as cursor:
# This query performs a full table scan for the given features, which is acceptable
# for an asynchronous agent running periodically. It is a typical AP query.
sql = """
SELECT feature_name, AVG(feature_value) as avg_val, STDDEV(feature_value) as std_val
FROM realtime_user_features
WHERE feature_name IN (%s) AND updated_at > NOW() - INTERVAL 1 HOUR
GROUP BY feature_name
"""
placeholders = ', '.join(['%s'] * len(CRITICAL_FEATURES))
formatted_sql = sql % placeholders
cursor.execute(formatted_sql, tuple(CRITICAL_FEATURES))
results = cursor.fetchall()
for row in results:
feature_name = row['feature_name']
avg_val = row['avg_val']
std_val = row['std_val']
if avg_val is not None:
statsd.gauge("data.distribution.avg", avg_val, tags=[f"feature:{feature_name}"])
if std_val is not None:
statsd.gauge("data.distribution.stddev", std_val, tags=[f"feature:{feature_name}"])
logger.info(f"Feature '{feature_name}' stats: avg={avg_val}, stddev={std_val}")
except mysql.connector.Error as err:
logger.error(f"DriftMonitor failed: {err}")
statsd.increment("agent.monitor.errors", tags=["monitor:drift"])
def run_all_monitors():
"""Main job function to be scheduled."""
start_time = time.monotonic()
logger.info("Starting monitor cycle...")
statsd.increment("agent.cycle.runs")
db_conn = get_db_connection()
if db_conn:
try:
# Run monitors
FeatureFreshnessMonitor().run(db_conn)
FeatureDriftMonitor().run(db_conn)
# Add other monitors here, like FeatureVolumeMonitor
finally:
if db_conn.is_connected():
db_conn.close()
latency = (time.monotonic() - start_time) * 1000
statsd.timing("agent.cycle.duration_ms", latency)
logger.info(f"Monitor cycle finished in {latency:.2f}ms")
if __name__ == "__main__":
logger.info("Starting MLOps Observability Agent...")
statsd.event("Agent Startup", "MLOps Observability Agent process has started.", alert_type="info")
# Schedule the job. In a real environment, you might use a more robust scheduler
# or run this within a Kubernetes CronJob.
schedule.every(30).seconds.do(run_all_monitors)
run_all_monitors() # Run once immediately on startup
while True:
schedule.run_pending()
time.sleep(1)
第四步:架构整合与可视化
整个数据流形成了一个闭环。
graph TD
A[Kafka: Raw Events] --> B{Feature Ingestion Service};
B -- Writes Features --> C[TiDB Cluster];
subgraph "Observability Pipeline"
D[Observability Agent]
D -- Periodically Queries (AP) --> C;
D -- Sends Custom Metrics --> E[Datadog Agent];
end
subgraph "Online Serving"
F[Model Serving API]
F -- High-Frequency Lookups (TP) --> C;
F -- Sends Performance Metrics --> E;
end
E --> G((Datadog Platform));
style B fill:#cde4ff
style D fill:#d5fada
style F fill:#fff2cd
在Datadog中,我们创建了一个专用的Dashboard。上面并排陈列着几个关键图表:
- 模型服务API P99延迟 & QPS: 由模型服务自身APM工具发出。
- 特征写入延迟 (
mlops.feature_ingestion.db.upsert.latency): 由特征注入服务发出。 - 特征新鲜度 (
mlops.feature_store.data.freshness.staleness_seconds): 由我们的可观测性代理发出。我们可以为这个指标设置一个阈值告警,例如,当任何关键特征的延迟超过60秒时,立即通知On-Call工程师。 - 特征均值漂移 (
mlops.feature_store.data.distribution.avg): 这让我们能够以小时为单位,直观地看到特征的数值分布是否稳定。Datadog的异常检测功能可以自动学习基线并高亮异常波动。
通过这个Dashboard,当模型服务延迟上升时,我们能在一分钟内判断出,到底是应用代码问题,还是数据库性能问题,亦或是更隐蔽的——特征数据停止更新导致的模型空转或计算超时。
方案的局限性与未来迭代
这套方案解决了我们最初的“数据盲目性”问题,但它并非完美。
首先,目前的漂移检测非常初级,仅计算了均值和标准差。一个真正生产级的漂移检测系统应该实现更复杂的统计检验,比如Kolmogorov-Smirnov (KS) test,并且能够对比当前窗口与一个稳定基线窗口的数据分布。
其次,代理通过轮询(Polling)方式查询TiDB来获取数据状态,这会给数据库带来额外的、周期性的读负载。虽然TiDB的HTAP能力可以应对,但对于一个超大规模的系统,更优雅的方案是利用TiDB的变更数据捕获(CDC)功能,也就是TiCDC。我们可以部署一个TiCDC节点,让它将realtime_user_features表的变更实时推送到一个消息队列(如Kafka),我们的可观测性代理就可以作为消费者,进行流式计算。这种事件驱动的方式,不仅实时性更高,而且对源数据库的侵入性也更小。
最后,当前的单元测试思路仅限于代码逻辑。对于可观测性管道,需要引入集成测试,即在CI环境中启动一个迷你的TiDB实例和一个mock的Datadog agent,端到端地验证指标能够被正确计算和发送。这能有效防止配置错误或SQL查询逻辑变更带来的监控失效。