利用TiDB和Datadog为实时特征平台构建数据质量与性能可观测性管道


模型在生产环境中的表现衰退,往往不是算法问题,而是数据问题。我们最初的MLOps平台在这一点上是个黑盒。特征计算和注入TiDB的流程看起来运转正常,但当线上欺诈检测模型的准确率开始无预警下滑时,我们才意识到,我们对“数据”本身的状态一无所知。我们能看到TiDB的QPS、延迟,但对关键特征的“新鲜度”、数值分布的“漂移”情况却完全盲目。问题不在于缺少监控,而在于我们监控了错误的东西。

最初的构想很简单:我们需要将数据质量指标和系统性能指标放在同一个地方。当模型服务API的P99延迟飙高时,我希望立刻能看到它依赖的某个用户特征是不是超过了5秒钟没有更新,或者某个特征的均值是不是在过去1小时内偏离了正常基线。这种关联分析如果靠事后跨系统拉日志,黄花菜都凉了。

技术选型决策很快就清晰了。我们的特征数据已经存储在TiDB中,它作为一款HTAP数据库,既能支撑线上模型服务的高并发点查(TP),又能处理我们监控代理需要的轻量级聚合分析(AP),无需引入额外的OLAP系统增加架构复杂性。而公司的可观测性平台标准是Datadog,它强大的自定义指标(Custom Metrics)和Dashboard能力,正是我们需要的。粘合剂自然是Python,它是我们整个ML算法和数据工程团队的主力语言。

我们的目标是构建一个独立的可观测性管道,它能并行于主业务流程,持续地从TiDB中抽取数据状态,计算成可观测性指标,然后推送到Datadog。

第一步:TiDB中的特征存储设计

一切的基础是数据如何被组织。在一个高并发的欺诈检测场景,特征表的设计必须优先考虑点查性能。同时,为了我们的可观测性管道,也需要为聚合查询提供便利。

我们为用户的实时行为特征设计了如下的表结构。注意这里的索引设计,PRIMARY KEY (user_id, feature_name) 是为了服务于“获取某用户的所有特征”这一高频场景。而 idx_feature_timestamp 索引则是为我们的可观测性代理量身定做的,用于高效查询某个特征的最新更新时间,这是计算新鲜度的关键。

-- feature_store.realtime_user_features table structure
CREATE TABLE IF NOT EXISTS `realtime_user_features` (
    `id` BIGINT AUTO_RANDOM,
    `user_id` VARCHAR(255) NOT NULL COMMENT '用户唯一标识',
    `feature_name` VARCHAR(255) NOT NULL COMMENT '特征名称, e.g., last_30min_transaction_count',
    `feature_value` DOUBLE NOT NULL COMMENT '特征值',
    `updated_at` DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '特征更新时间戳,精确到微秒',
    PRIMARY KEY (`user_id`, `feature_name`),
    INDEX `idx_feature_timestamp` (`feature_name`, `updated_at` DESC)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

在真实项目中,AUTO_RANDOM 主键对于避免TiDB的写热点至关重要,特别是在高吞吐的写入场景下。

第二步:特征注入服务的改造

我们的特征注入服务是一个基于Python FastAPI构建的微服务。它接收来自上游Kafka的原始交易事件,经过实时计算(例如使用滑动窗口),然后将新特征写入TiDB。为了支撑后续的排错和分析,对这个服务的日志和异常处理做了强化。

这里的核心代码不仅是简单的数据库写入,它包含了健壮的错误处理、重试逻辑以及结构化日志。

# feature_ingestion/main.py

import os
import time
import logging
from typing import Dict, Any

import mysql.connector
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from dogstatsd import DogStatsd

# --- Configuration ---
# In a real app, use Pydantic settings or a config management system
DB_HOST = os.getenv("TIDB_HOST", "127.0.0.1")
DB_PORT = int(os.getenv("TIDB_PORT", 4000))
DB_USER = os.getenv("TIDB_USER", "root")
DB_PASSWORD = os.getenv("TIDB_PASSWORD", "")
DB_DATABASE = os.getenv("TIDB_DATABASE", "feature_store")
DATADOG_AGENT_HOST = os.getenv("DATADOG_AGENT_HOST", "127.0.0.1")
DATADOG_AGENT_PORT = int(os.getenv("DATADOG_AGENT_PORT", 8125))

# --- Logging Setup ---
# Use structured logging for better parsing in Datadog Logs
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

# --- Datadog StatsD Client ---
statsd = DogStatsd(host=DATADOG_AGENT_HOST, port=DATADOG_AGENT_PORT, namespace="mlops.feature_ingestion")

# --- Database Connection Pool ---
# Using a simple connection function here. For production, use a robust pool like SQLAlchemy's.
def get_db_connection():
    try:
        conn = mysql.connector.connect(
            host=DB_HOST,
            port=DB_PORT,
            user=DB_USER,
            password=DB_PASSWORD,
            database=DB_DATABASE,
            autocommit=False # Important for transaction control
        )
        return conn
    except mysql.connector.Error as err:
        logger.error(f"Database connection failed: {err}")
        statsd.increment("db.connection.errors", tags=["error:initial_connection"])
        raise

app = FastAPI()

@app.post("/v1/ingest")
async def ingest_feature(payload: Dict[str, Any]):
    """
    Receives a feature to be ingested into TiDB.
    Payload example:
    {
        "user_id": "user-123",
        "feature_name": "last_1h_avg_txn_amount",
        "feature_value": 150.75,
        "event_timestamp": "2023-10-27T10:00:00.123456Z"
    }
    """
    user_id = payload.get("user_id")
    feature_name = payload.get("feature_name")
    feature_value = payload.get("feature_value")

    if not all([user_id, feature_name, feature_value is not None]):
        return JSONResponse(
            status_code=status.HTTP_400_BAD_REQUEST,
            content={"message": "Missing required fields: user_id, feature_name, feature_value"}
        )

    # A common mistake is to not wrap DB operations in try/except/finally
    conn = None
    attempt = 0
    max_attempts = 3
    
    start_time = time.monotonic()
    
    while attempt < max_attempts:
        try:
            conn = get_db_connection()
            cursor = conn.cursor()

            # Using ON DUPLICATE KEY UPDATE is atomic and efficient for this pattern
            sql = """
            INSERT INTO realtime_user_features (user_id, feature_name, feature_value)
            VALUES (%s, %s, %s)
            ON DUPLICATE KEY UPDATE feature_value = VALUES(feature_value), updated_at = CURRENT_TIMESTAMP(6)
            """
            cursor.execute(sql, (user_id, feature_name, feature_value))
            conn.commit()
            
            # --- Datadog Instrumentation ---
            latency = (time.monotonic() - start_time) * 1000
            statsd.timing("db.upsert.latency", latency, tags=[f"feature:{feature_name}"])
            statsd.increment("ingestion.events.success", tags=[f"feature:{feature_name}"])
            
            logger.info(f"Successfully ingested feature '{feature_name}' for user '{user_id}'")
            return {"status": "success"}

        except mysql.connector.Error as err:
            attempt += 1
            logger.warning(f"DB error on attempt {attempt}/{max_attempts}: {err}. Retrying...")
            if conn:
                conn.rollback() # Rollback on failure
            
            # This tag is critical for alerting on specific error types
            statsd.increment("db.operation.errors", tags=[f"error:{type(err).__name__}", "operation:upsert"])
            
            if attempt >= max_attempts:
                logger.error(f"Failed to ingest feature after {max_attempts} attempts for user '{user_id}'")
                statsd.increment("ingestion.events.failed", tags=[f"feature:{feature_name}"])
                return JSONResponse(
                    status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                    content={"message": "Database operation failed permanently"}
                )
            time.sleep(0.5 * attempt) # Exponential backoff
        
        finally:
            if conn and conn.is_connected():
                cursor.close()
                conn.close()

    # This part should ideally not be reached
    return JSONResponse(status_code=500, content={"message": "Internal Server Error"})

第三步:构建核心的可观测性代理

这才是整个方案的灵魂。我们编写了一个独立的Python脚本,它作为一个守护进程运行。它的唯一职责就是:定期查询TiDB,计算我们关心的“数据质量”和“数据状态”指标,然后通过dogstatsd发送给Datadog。

我们将代理的职责拆分的非常清晰:

  1. FeatureFreshnessMonitor: 监控特征的新鲜度。
  2. FeatureDriftMonitor: 监控特征的统计分布漂移。
  3. FeatureVolumeMonitor: 监控特征的写入量。
# observability_agent/agent.py

import os
import time
import logging
import schedule
import mysql.connector
from dogstatsd import DogStatsd
from datetime import datetime, timezone

# --- Configuration ---
DB_HOST = os.getenv("TIDB_HOST", "127.0.0.1")
DB_PORT = int(os.getenv("TIDB_PORT", 4000))
DB_USER = os.getenv("TIDB_USER", "root")
DB_PASSWORD = os.getenv("TIDB_PASSWORD", "")
DB_DATABASE = os.getenv("TIDB_DATABASE", "feature_store")
DATADOG_AGENT_HOST = os.getenv("DATADOG_AGENT_HOST", "127.0.0.1")
DATADOG_AGENT_PORT = int(os.getenv("DATADOG_AGENT_PORT", 8125))

# The list of critical features we want to monitor.
# In a production system, this list would come from a configuration service or a database.
CRITICAL_FEATURES = [
    "last_1h_transaction_count",
    "last_1h_avg_txn_amount",
    "last_24h_failed_txn_count",
]

# --- Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("ObservabilityAgent")

# --- Datadog StatsD Client ---
statsd = DogStatsd(host=DATADOG_AGENT_HOST, port=DATADOG_AGENT_PORT, namespace="mlops.feature_store")

def get_db_connection():
    # It's better to have a single, long-lived connection for a daemon process
    # with proper reconnection logic, but for simplicity, we reconnect each time.
    try:
        return mysql.connector.connect(
            host=DB_HOST, port=DB_PORT, user=DB_USER,
            password=DB_PASSWORD, database=DB_DATABASE
        )
    except mysql.connector.Error as err:
        logger.error(f"Agent DB connection failed: {err}")
        statsd.increment("agent.db.connection.errors")
        return None

class FeatureFreshnessMonitor:
    """Calculates the staleness of features."""
    def run(self, conn):
        if not conn:
            return
        
        logger.info("Running FeatureFreshnessMonitor...")
        try:
            with conn.cursor(dictionary=True) as cursor:
                # This query is optimized by the `idx_feature_timestamp` index.
                # It finds the latest update time for each of our critical features.
                sql = """
                SELECT feature_name, MAX(updated_at) as last_update
                FROM realtime_user_features
                WHERE feature_name IN (%s)
                GROUP BY feature_name
                """
                # Create a placeholder string for the IN clause
                placeholders = ', '.join(['%s'] * len(CRITICAL_FEATURES))
                formatted_sql = sql % placeholders
                
                cursor.execute(formatted_sql, tuple(CRITICAL_FEATURES))
                results = cursor.fetchall()

                now_utc = datetime.now(timezone.utc)
                
                for row in results:
                    feature_name = row['feature_name']
                    # Ensure last_update is timezone-aware for correct calculation
                    last_update_utc = row['last_update'].replace(tzinfo=timezone.utc)
                    
                    staleness_seconds = (now_utc - last_update_utc).total_seconds()
                    
                    statsd.gauge(
                        "data.freshness.staleness_seconds",
                        staleness_seconds,
                        tags=[f"feature:{feature_name}"]
                    )
                    logger.info(f"Feature '{feature_name}' staleness: {staleness_seconds:.2f}s")

        except mysql.connector.Error as err:
            logger.error(f"FreshnessMonitor failed: {err}")
            statsd.increment("agent.monitor.errors", tags=["monitor:freshness"])

class FeatureDriftMonitor:
    """Calculates basic statistics for drift detection."""
    def run(self, conn):
        if not conn:
            return
        
        logger.info("Running FeatureDriftMonitor...")
        try:
            with conn.cursor(dictionary=True) as cursor:
                # This query performs a full table scan for the given features, which is acceptable
                # for an asynchronous agent running periodically. It is a typical AP query.
                sql = """
                SELECT feature_name, AVG(feature_value) as avg_val, STDDEV(feature_value) as std_val
                FROM realtime_user_features
                WHERE feature_name IN (%s) AND updated_at > NOW() - INTERVAL 1 HOUR
                GROUP BY feature_name
                """
                placeholders = ', '.join(['%s'] * len(CRITICAL_FEATURES))
                formatted_sql = sql % placeholders
                
                cursor.execute(formatted_sql, tuple(CRITICAL_FEATURES))
                results = cursor.fetchall()

                for row in results:
                    feature_name = row['feature_name']
                    avg_val = row['avg_val']
                    std_val = row['std_val']
                    
                    if avg_val is not None:
                        statsd.gauge("data.distribution.avg", avg_val, tags=[f"feature:{feature_name}"])
                    if std_val is not None:
                        statsd.gauge("data.distribution.stddev", std_val, tags=[f"feature:{feature_name}"])
                    
                    logger.info(f"Feature '{feature_name}' stats: avg={avg_val}, stddev={std_val}")
        
        except mysql.connector.Error as err:
            logger.error(f"DriftMonitor failed: {err}")
            statsd.increment("agent.monitor.errors", tags=["monitor:drift"])

def run_all_monitors():
    """Main job function to be scheduled."""
    start_time = time.monotonic()
    logger.info("Starting monitor cycle...")
    statsd.increment("agent.cycle.runs")
    
    db_conn = get_db_connection()
    
    if db_conn:
        try:
            # Run monitors
            FeatureFreshnessMonitor().run(db_conn)
            FeatureDriftMonitor().run(db_conn)
            # Add other monitors here, like FeatureVolumeMonitor
        finally:
            if db_conn.is_connected():
                db_conn.close()

    latency = (time.monotonic() - start_time) * 1000
    statsd.timing("agent.cycle.duration_ms", latency)
    logger.info(f"Monitor cycle finished in {latency:.2f}ms")


if __name__ == "__main__":
    logger.info("Starting MLOps Observability Agent...")
    statsd.event("Agent Startup", "MLOps Observability Agent process has started.", alert_type="info")

    # Schedule the job. In a real environment, you might use a more robust scheduler
    # or run this within a Kubernetes CronJob.
    schedule.every(30).seconds.do(run_all_monitors)

    run_all_monitors() # Run once immediately on startup
    
    while True:
        schedule.run_pending()
        time.sleep(1)

第四步:架构整合与可视化

整个数据流形成了一个闭环。

graph TD
    A[Kafka: Raw Events] --> B{Feature Ingestion Service};
    B -- Writes Features --> C[TiDB Cluster];
    
    subgraph "Observability Pipeline"
        D[Observability Agent]
        D -- Periodically Queries (AP) --> C;
        D -- Sends Custom Metrics --> E[Datadog Agent];
    end
    
    subgraph "Online Serving"
        F[Model Serving API]
        F -- High-Frequency Lookups (TP) --> C;
        F -- Sends Performance Metrics --> E;
    end
    
    E --> G((Datadog Platform));

    style B fill:#cde4ff
    style D fill:#d5fada
    style F fill:#fff2cd

在Datadog中,我们创建了一个专用的Dashboard。上面并排陈列着几个关键图表:

  1. 模型服务API P99延迟 & QPS: 由模型服务自身APM工具发出。
  2. 特征写入延迟 (mlops.feature_ingestion.db.upsert.latency): 由特征注入服务发出。
  3. 特征新鲜度 (mlops.feature_store.data.freshness.staleness_seconds): 由我们的可观测性代理发出。我们可以为这个指标设置一个阈值告警,例如,当任何关键特征的延迟超过60秒时,立即通知On-Call工程师。
  4. 特征均值漂移 (mlops.feature_store.data.distribution.avg): 这让我们能够以小时为单位,直观地看到特征的数值分布是否稳定。Datadog的异常检测功能可以自动学习基线并高亮异常波动。

通过这个Dashboard,当模型服务延迟上升时,我们能在一分钟内判断出,到底是应用代码问题,还是数据库性能问题,亦或是更隐蔽的——特征数据停止更新导致的模型空转或计算超时。

方案的局限性与未来迭代

这套方案解决了我们最初的“数据盲目性”问题,但它并非完美。

首先,目前的漂移检测非常初级,仅计算了均值和标准差。一个真正生产级的漂移检测系统应该实现更复杂的统计检验,比如Kolmogorov-Smirnov (KS) test,并且能够对比当前窗口与一个稳定基线窗口的数据分布。

其次,代理通过轮询(Polling)方式查询TiDB来获取数据状态,这会给数据库带来额外的、周期性的读负载。虽然TiDB的HTAP能力可以应对,但对于一个超大规模的系统,更优雅的方案是利用TiDB的变更数据捕获(CDC)功能,也就是TiCDC。我们可以部署一个TiCDC节点,让它将realtime_user_features表的变更实时推送到一个消息队列(如Kafka),我们的可观测性代理就可以作为消费者,进行流式计算。这种事件驱动的方式,不仅实时性更高,而且对源数据库的侵入性也更小。

最后,当前的单元测试思路仅限于代码逻辑。对于可观测性管道,需要引入集成测试,即在CI环境中启动一个迷你的TiDB实例和一个mock的Datadog agent,端到端地验证指标能够被正确计算和发送。这能有效防止配置错误或SQL查询逻辑变更带来的监控失效。


  目录