我们最近交付的一个语义搜索服务在内部测试时表现尚可,但上线后,随着查询复杂度和并发量的增加,P95 延迟开始频繁触及告警阈值。服务本身并不复杂:一个 FastAPI 接口接收查询文本,使用 Scikit-learn 加载的 TfidfVectorizer 进行向量化,然后请求 Qdrant 向量数据库执行相似度搜索。问题在于,当一个请求耗时 200ms 时,我们完全不知道这 200ms 究竟消耗在了哪里。
是 TF-IDF 的 transform 方法在高并发下 CPU 竞争激烈?是 FastAPI 的依赖注入或中间件引入了额外开销?是到 Qdrant 的网络延迟?还是 Qdrant 内部的 HNSW 索引搜索慢了?每一个环节都是一个黑盒。在真实项目中,这种“凭感觉”的性能优化是不可接受的,它浪费时间且常常指向错误的方向。我们需要的是数据,是能够清晰展示请求完整生命周期中每个环节耗时的证据。这正是引入分布式追踪系统的典型场景。
我们的技术栈是 Python,因此 OpenTelemetry 成为了首选的标准化方案,后端则选择开源且易于部署的 Jaeger。目标很明确:将一次完整的 /search 请求,从进入 FastAPI 网关,到 Scikit-learn 处理,再到 Qdrant 客户端发出 gRPC 请求,最后返回结果的整个链路,以一个清晰的火焰图呈现在 Jaeger UI 中。
第一阶段:环境搭建与无观测的基线应用
在进行任何优化或改造之前,必须先建立一个可复现的基线环境。我们使用 docker-compose 来统一管理 Qdrant 和 Jaeger 实例,这对于本地开发和后续的 CI/CD 都至关重要。
docker-compose.yml:
version: '3.8'
services:
qdrant:
image: qdrant/qdrant:v1.6.0
container_name: qdrant_service
ports:
- "6333:6333" # gRPC
- "6334:6334" # REST
volumes:
- ./qdrant_storage:/qdrant/storage
# 在生产环境中,应通过配置文件挂载或环境变量来设置
# command: ["./qdrant", "--config-path", "/qdrant/config/production.yaml"]
jaeger:
image: jaegertracing/all-in-one:1.41
container_name: jaeger_service
ports:
- "6831:6831/udp" # Agent (Jaeger thrift)
- "16686:16686" # UI
- "14268:14268" # Collector (HTTP)
environment:
- COLLECTOR_OTLP_ENABLED=true # 启用 OTLP 接收器
这个配置启动了 Qdrant 和一个 all-in-one 模式的 Jaeger。关键点是 Jaeger 的环境变量 COLLECTOR_OTLP_ENABLED=true,这让它能够接收 OpenTelemetry 原生的 OTLP 格式数据,无需额外的转换。
接下来是我们的基线 Python 应用。项目结构如下:
.
├── app
│ ├── __init__.py
│ ├── main.py # FastAPI 应用入口
│ ├── services.py # 业务逻辑,与 Qdrant 和 Scikit-learn 交互
│ └── models.py # 数据模型和向量化器管理
├── docker-compose.yml
├── requirements.txt
└── scripts
└── ingest_data.py # 数据灌入脚本
requirements.txt 内容:
fastapi
uvicorn
scikit-learn
qdrant-client[fastembed] # 使用 fastembed 加速
numpy
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp-proto-grpc
opentelemetry-instrumentation-fastapi
opentelemetry-instrumentation-grpc
models.py 负责加载和管理 Scikit-learn 模型。在真实项目中,模型应通过 MLFlow 等工具进行版本管理和加载,这里我们简化为本地文件。
# app/models.py
import pickle
from pathlib import Path
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
MODEL_PATH = Path(__file__).parent.parent / "models_store" / "tfidf_pipeline.pkl"
MODEL_DIMENSION = 5000 # 假设 TF-IDF 输出维度
_pipeline: Pipeline = None
def get_vectorizer_pipeline() -> Pipeline:
"""
加载并缓存 Scikit-learn pipeline.
在生产环境中,这应该是一个更健壮的单例或依赖注入模式。
"""
global _pipeline
if _pipeline is None:
if not MODEL_PATH.exists():
# 在实际应用中,模型不存在应触发严重错误告警
# 这里为了演示,我们创建一个并保存
print("Model not found. Creating a dummy one.")
documents = [
"this is a sample document for vectorization",
"another document to build vocabulary"
]
pipeline = Pipeline([
('tfidf', TfidfVectorizer(max_features=MODEL_DIMENSION))
])
pipeline.fit(documents)
MODEL_PATH.parent.mkdir(exist_ok=True)
with open(MODEL_PATH, 'wb') as f:
pickle.dump(pipeline, f)
_pipeline = pipeline
else:
with open(MODEL_PATH, 'rb') as f:
_pipeline = pickle.load(f)
return _pipeline
def vectorize(text: str) -> list[float]:
"""使用加载的模型将文本向量化"""
pipeline = get_vectorizer_pipeline()
# transform 输入需要是可迭代对象
vector = pipeline.transform([text]).toarray()[0]
return vector.tolist()
services.py 封装了与 Qdrant 的交互。
# app/services.py
import logging
from qdrant_client import QdrantClient, models
from .models import vectorize, MODEL_DIMENSION
# 在生产中,这些配置应该来自环境变量
QDRANT_HOST = "localhost"
QDRANT_PORT = 6333
COLLECTION_NAME = "my_tech_docs"
# 使用单例模式管理 Qdrant client
_qdrant_client: QdrantClient = None
def get_qdrant_client() -> QdrantClient:
global _qdrant_client
if _qdrant_client is None:
try:
_qdrant_client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
# 检查集合是否存在,不存在则创建
try:
_qdrant_client.get_collection(collection_name=COLLECTION_NAME)
except Exception: # 更精确的异常处理会更好
_qdrant_client.recreate_collection(
collection_name=COLLECTION_NAME,
vectors_config=models.VectorParams(size=MODEL_DIMENSION, distance=models.Distance.COSINE),
)
except Exception as e:
logging.error(f"Failed to connect to Qdrant: {e}")
raise
return _qdrant_client
def search_documents(query: str, limit: int = 5) -> list[dict]:
"""
执行向量搜索的核心逻辑.
1. 向量化查询
2. 在 Qdrant 中搜索
"""
query_vector = vectorize(query)
client = get_qdrant_client()
try:
search_result = client.search(
collection_name=COLLECTION_NAME,
query_vector=query_vector,
limit=limit,
with_payload=True # 获取原始数据
)
# 将结果格式化为业务需要的格式
results = [
{
"id": hit.id,
"score": hit.score,
"payload": hit.payload
} for hit in search_result
]
return results
except Exception as e:
logging.error(f"Qdrant search failed: {e}")
# 在真实项目中,这里应该抛出自定义的业务异常
return []
最后,main.py 将所有东西串联起来。
# app/main.py
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .services import search_documents
# 配置基础日志
logging.basicConfig(level=logging.INFO)
app = FastAPI(title="Semantic Search Service")
class SearchRequest(BaseModel):
query: str
limit: int = 5
class SearchResponse(BaseModel):
results: list[dict]
@app.post("/search", response_model=SearchResponse)
async def search(request: SearchRequest):
"""
语义搜索 API 端点
"""
if not request.query.strip():
raise HTTPException(status_code=400, detail="Query cannot be empty")
try:
results = search_documents(request.query, request.limit)
return SearchResponse(results=results)
except Exception as e:
# 笼统的异常处理,在生产中应更细化
logging.error(f"An unexpected error occurred during search: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/health")
def health_check():
return {"status": "ok"}
现在,启动应用 (uvicorn app.main:app --reload),我们可以通过 curl 调用它。但如前所述,它的性能表现是个黑盒。
第二阶段:引入 OpenTelemetry,点亮 FastAPI 入口
改造的第一步是让请求的入口点(FastAPI)能够生成和导出 Trace 信息。我们需要一个专门的模块来处理 OpenTelemetry 的所有初始化和配置工作,这避免了将观测性代码污染到业务逻辑中。
app/tracing.py:
# app/tracing.py
import logging
import os
from fastapi import FastAPI
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
def setup_tracing(app: FastAPI, service_name: str):
"""
为 FastAPI 应用配置 OpenTelemetry Tracing.
"""
# 从环境变量读取 Jaeger agent/collector 的地址,提供默认值
# 生产环境中,这个地址应该是 K8s service name 或者一个稳定的 DNS
jaeger_endpoint = os.getenv("JAEGER_ENDPOINT", "http://localhost:14268")
# Resource 是描述产生遥测数据的实体的一组属性
resource = Resource.create(attributes={
"service.name": service_name,
"service.instance.id": os.uname().nodename # 或者 pod name
})
# 创建一个 Tracer Provider
provider = TracerProvider(resource=resource)
# 创建一个 OTLP exporter,指向 Jaeger
# insecure=True 适用于本地开发,生产中应配置 TLS
otlp_exporter = OTLPSpanExporter(
endpoint=jaeger_endpoint,
insecure=True
)
# 使用 BatchSpanProcessor 异步批量导出 span
# 这在生产环境中是推荐的做法,可以减少对应用性能的影响
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
# 将配置好的 provider 设置为全局 provider
trace.set_tracer_provider(provider)
# 自动 instrument FastAPI 应用
# 这会自动为每个请求创建 root span,并处理 W3C Trace Context 的传播
FastAPIInstrumentor.instrument_app(app)
# 自动 instrument gRPC 客户端
# 这对 Qdrant client 至关重要
GrpcInstrumentorClient().instrument()
logging.info(f"Tracing setup complete for service '{service_name}', exporting to '{jaeger_endpoint}'")
def shutdown_tracing():
"""
在应用关闭时优雅地关闭 Tracer Provider,确保所有缓冲的 span 都被发送。
"""
if trace.get_tracer_provider() and hasattr(trace.get_tracer_provider(), 'shutdown'):
logging.info("Shutting down tracer provider.")
trace.get_tracer_provider().shutdown()
这个 tracing.py 模块做了几件关键事情:
- 配置 Resource:
service.name是最重要的属性,它将是在 Jaeger UI 中看到的服务名。 - 配置 Exporter: 使用
OTLPSpanExporter将数据以 OTLP/gRPC 格式发送到 Jaeger。 - 配置 Processor:
BatchSpanProcessor性能更好,它会收集一段时间的 Spans 然后一次性发送。 - 自动 Instrumentation:
FastAPIInstrumentor和GrpcInstrumentorClient是精华所在。它们通过 monkey-patching 的方式,无侵入地为 FastAPI 的请求处理和 gRPC 的客户端调用添加了 tracing 能力。
现在,我们需要在 main.py 中调用它。
# app/main.py (修改后)
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .services import search_documents
from .tracing import setup_tracing, shutdown_tracing # 导入
logging.basicConfig(level=logging.INFO)
# 在 FastAPI 应用对象创建后立即设置 tracing
app = FastAPI(
title="Semantic Search Service",
on_startup=[lambda: setup_tracing(app, "semantic-search-service")],
on_shutdown=[shutdown_tracing]
)
# ... (其余代码不变)
通过 on_startup 和 on_shutdown 事件,我们将 tracing 的生命周期与应用的生命周期绑定。
重启应用并发起一次搜索请求后,打开 Jaeger UI (http://localhost:16686),选择 semantic-search-service,你应该能看到第一条 trace。它只有一个 Span,记录了整个 /search 请求的耗时。这是一个好的开始,但内部依然是黑盒。
第三阶段:手动埋点,深入业务逻辑
自动埋点只能覆盖框架层面,要了解业务逻辑内部的耗时,必须进行手动埋点。OpenTelemetry 提供了非常简单的 API 来创建自定义 Span。
我们的目标是为 services.py 中的两个关键步骤创建子 Span:
-
vectorize(query): Scikit-learn 的向量化过程。 -
client.search(...): Qdrant 客户端的 gRPC 调用。
修改 services.py:
# app/services.py (修改后)
import logging
from qdrant_client import QdrantClient, models
from .models import vectorize, MODEL_DIMENSION
# 导入 opentelemetry trace API
from opentelemetry import trace
# 获取一个 tracer 实例。最佳实践是使用模块名作为 tracer 名
tracer = trace.get_tracer(__name__)
# ... (get_qdrant_client 不变)
def search_documents(query: str, limit: int = 5) -> list[dict]:
# 使用 `start_as_current_span` 创建一个包裹整个函数的父 Span
# 这有助于将业务逻辑聚合在一起
with tracer.start_as_current_span("service.search_documents") as parent_span:
parent_span.set_attribute("search.query.length", len(query))
parent_span.set_attribute("search.limit", limit)
# 为向量化步骤创建一个子 Span
with tracer.start_as_current_span("ml.vectorize") as vector_span:
try:
query_vector = vectorize(query)
vector_span.set_attribute("ml.vector.dimension", len(query_vector))
except Exception as e:
# 在 span 中记录异常信息是排查问题的关键
vector_span.record_exception(e)
vector_span.set_status(trace.Status(trace.StatusCode.ERROR, "Vectorization failed"))
raise # 重新抛出异常,让上层处理
client = get_qdrant_client()
# 为 Qdrant 搜索创建一个子 Span
with tracer.start_as_current_span("db.qdrant.search") as db_span:
# 添加符合 OpenTelemetry 语义化约定的属性
db_span.set_attribute("db.system", "qdrant")
db_span.set_attribute("db.operation", "search")
db_span.set_attribute("db.collection.name", COLLECTION_NAME)
try:
search_result = client.search(
collection_name=COLLECTION_NAME,
query_vector=query_vector,
limit=limit,
with_payload=True
)
num_hits = len(search_result)
db_span.set_attribute("db.result.hits", num_hits)
db_span.set_status(trace.StatusCode.OK)
results = [
{"id": hit.id, "score": hit.score, "payload": hit.payload}
for hit in search_result
]
return results
except Exception as e:
logging.error(f"Qdrant search failed: {e}")
db_span.record_exception(e)
db_span.set_status(trace.Status(trace.StatusCode.ERROR, "Qdrant search failed"))
return []
这里的改动是核心:
- 获取 Tracer:
trace.get_tracer(__name__)是获取当前模块的 tracer 实例的标准方法。 - 创建 Spans:
with tracer.start_as_current_span("span_name")是最常用的方式。它创建一个新的 Span,并将其设置为当前的活动 Span。在这个with块内创建的任何新 Span 都会自动成为它的子 Span。 - 添加属性 (Attributes):
span.set_attribute()用于为 Span 添加键值对元数据。我们添加了查询长度、向量维度、Qdrant 集合名等,这些信息在分析时极具价值。遵循 OpenTelemetry 的语义约定(如db.system)能让各种可观测性后端更好地理解和展示数据。 - 记录异常 (Exceptions):
span.record_exception()和span.set_status()对于错误追踪至关重要。当异常发生时,它会被记录在 Span 中,并在 Jaeger UI 上高亮显示。
现在再次运行并请求,Jaeger 中的 Trace 会变得丰富得多。
gantt
title Search Request Trace Breakdown
dateFormat X
axisFormat %Lms
section FastAPI Request
POST /search: 0, 210
section Service Logic
service.search_documents: 5, 200
ml.vectorize (Scikit-learn): 10, 150
db.qdrant.search (gRPC Call): 165, 35
从这个(模拟的)甘特图可以看出,一次 210ms 的请求中,ml.vectorize 占了 150ms,而 Qdrant 的搜索本身只占 35ms。瓶颈一目了然:同步的、CPU 密集型的 TF-IDF 计算阻塞了 FastAPI 的事件循环。一个常见的错误是在异步框架中执行长时间的同步 CPU 计算,而分布式追踪清晰地暴露了这个问题。
值得注意的是 db.qdrant.search 这个 Span。由于我们在 tracing.py 中启用了 GrpcInstrumentorClient,它会自动为 qdrant-client 发出的 gRPC 请求创建 Span。我们手动创建的 db.qdrant.search Span 成为了它的父 Span,提供了更丰富的业务上下文。这就是手动埋点与自动埋点结合的威力。
第四阶段:生产级考量与日志关联
我们已经有了一个可用的追踪系统,但在投入生产前,还有几点需要完善。
日志与 Trace 的关联
排查问题时,我们常常需要在日志和 Trace 之间跳转。将 trace_id 和 span_id 注入到日志中可以实现这一点。我们可以通过自定义 logging 的 Formatter 来实现。
创建一个 logging_config.py:
# app/logging_config.py
import logging
from opentelemetry import trace
class TraceIdInjector(logging.Formatter):
def format(self, record):
span = trace.get_current_span()
if span != trace.INVALID_SPAN:
record.trace_id = f'{span.get_span_context().trace_id:032x}'
record.span_id = f'{span.get_span_context().span_id:016x}'
else:
record.trace_id = "N/A"
record.span_id = "N/A"
return super().format(record)
def setup_logging():
# 移除 uvicorn 的默认 handler
for handler in logging.root.handlers:
logging.root.removeHandler(handler)
# 创建我们自己的 handler
handler = logging.StreamHandler()
formatter = TraceIdInjector(
'%(asctime)s - %(name)s - %(levelname)s - [trace_id=%(trace_id)s, span_id=%(span_id)s] - %(message)s'
)
handler.setFormatter(formatter)
logging.root.addHandler(handler)
logging.root.setLevel(logging.INFO)
然后在 main.py 的启动事件中调用 setup_logging()。之后,你的应用日志会变成这样:
2023-10-27 11:00:00,123 - app.services - INFO - [trace_id=..., span_id=...] - Qdrant search successful.
有了 trace_id,你可以直接在 Jaeger UI 中搜索,定位到完整的请求链路。
采样 (Sampling)
在生产环境中,对每个请求都进行完整的追踪是非常昂贵的。OpenTelemetry 允许配置采样策略。默认的 AlwaysOnSampler 会记录所有 trace。对于高流量服务,应改为 TraceIdRatioBasedSampler。
在 tracing.py 的 TracerProvider 初始化中添加 sampler:
# app/tracing.py
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
# ...
# 采样率设置为 10%
sampler = TraceIdRatioBased(0.1)
provider = TracerProvider(resource=resource, sampler=sampler)
# ...
这意味着只有 10% 的请求会被完整记录和发送。如何选择采样率是一个权衡:太低可能错过稀有的错误,太高则成本增加。通常从一个较低的值开始,根据需要进行调整。更高级的策略,如尾部采样(tail-based sampling),允许在请求完成后根据其特征(如是否出错、耗时是否超长)来决定是否保留该 Trace,但这需要一个独立的采样代理服务。
局限性与未来展望
我们现在拥有一个对多阶段向量检索管线具备深度可观测性的系统。它能明确指出性能瓶颈是在 Scikit-learn 的计算阶段还是在 Qdrant 的 I/O 阶段。
但这个方案并非没有局限。首先,我们的追踪仅限于应用层面。我们看到了 gRPC 请求发往 Qdrant 的耗时,但 Qdrant 内部发生了什么依然是黑盒。一个更完整的可观测性体系需要 Qdrant 本身也支持 OpenTelemetry,这样客户端的 Span 就可以和 Qdrant 服务端的 Span 连接起来,形成一个跨进程的完整链路。
其次,对于 Scikit-learn 这种纯粹的计算库,我们的 ml.vectorize Span 内部也是一个整体。如果向量化本身非常复杂(例如,它是一个包含多个步骤的 pipeline),我们可能需要进一步在 vectorize 函数内部创建更细粒度的子 Span,来区分预处理、模型推理等步骤的耗时。
最后,当前的方案暴露了 Scikit-learn 同步阻塞的问题。下一步的架构迭代方向很明确:将 CPU 密集的向量化任务从主事件循环中剥离出去。可以采用 ThreadPoolExecutor 或者一个专门的、可水平扩展的微服务来处理向量化。而当我们这样做时,现有的追踪体系将再次发挥关键作用,因为它能帮助我们验证新架构是否真正解决了性能瓶颈,并追踪跨进程或跨服务调用的上下文。