在阿里云上构建基于 OpenSearch 与 OpenTelemetry 的跨平台全链路追踪架构


追踪一个用户请求的全生命周期,从 Flutter 应用的一次点击,到后端服务的复杂调用链,再到最终返回数据,这个过程中的性能瓶颈、错误节点定位,是所有分布式系统面临的共同难题。当我们的技术栈横跨移动端(Flutter)、Web管理端(Angular)、多种后端服务以及缓存(Redis)时,日志是分散的,指标是孤立的,追踪信息是断裂的。这种可观测性的“孤岛效应”在生产环境中是致命的,它直接导致故障排查时间被无限拉长。

核心问题在于:如何建立一个统一的、与具体厂商和语言无关的遥测数据管道,将来自异构环境(移动端、Web端、服务端)的 Trace、Log、Metric 数据关联起来,形成一个完整的调用链视图。

方案权衡:厂商全家桶 vs. 开源自建

在阿里云这个大环境下,我们首先评估了完全依赖云厂商的解决方案,例如阿里云的 ARMS(应用实时监控服务)和 SLS(日志服务)。

方案A: 阿里云 ARMS + SLS

  • 优势:
    • 开箱即用: 与阿里云生态(如 ACK, ECS, RDS)深度集成,自动化探针安装和数据采集相对简单。
    • 托管服务: 无需担心底层基础设施的运维、扩容和高可用问题。
    • 统一控制台: 在阿里云控制台内即可完成大部分监控和告警配置。
  • 劣势:
    • 厂商锁定: 这是最大的风险。一旦深度使用,所有的仪表盘、告警规则、数据模型都与特定平台绑定,未来迁移或混合云部署的成本极高。
    • 客户端支持局限: 对于 Flutter 这类非主流的后端开发框架,官方的自动探针支持通常较弱或缺失。虽然提供了 OpenTelemetry 协议的接入点,但其核心价值——自动化和深度集成——就打了折扣。
    • 数据所有权与成本: 数据存储在厂商的黑盒中,导出和进行深度二次分析的灵活性受限。长期来看,大规模遥测数据的存储和查询成本可能变得不可控。

方案B: OpenTelemetry + OpenSearch + 自建组件

  • 优势:
    • 开放标准: OpenTelemetry 是 CNCF 的项目,提供了标准化的 API、SDK 和协议,避免了厂商锁定。任何支持 OTLP (OpenTelemetry Protocol) 协议的后端都可以作为数据接收方。
    • 极致灵活性: 我们可以完全掌控数据采集、处理和存储的每一个环节。例如,在数据进入 OpenSearch 前,通过 OpenTelemetry Collector 进行采样、过滤、丰富和聚合,甚至可以引入 Redis 进行复杂的实时采样决策。
    • 生态广泛: 几乎所有主流语言和框架都有 OpenTelemetry 的官方或社区支持,包括 Flutter/Dart,这为我们统一跨平台数据采集提供了基础。
    • 成本可控: OpenSearch 作为开源项目,我们可以部署在自有的 ECS 或 ACK 集群上,对存储和计算成本有更精细的控制。结合阿里云的抢占式实例等策略,可以进一步优化成本。
  • 劣势:
    • 运维复杂度: 需要自行搭建和维护 OpenTelemetry Collector 集群、OpenSearch 集群以及可能引入的 Redis 集群。这要求团队具备相应的运维能力。
    • 初始投入高: 从零开始构建整个平台,包括数据模型的建立、仪表盘的创建、告警系统的集成,需要更多的时间和人力投入。

决策:选择方案 B

在真实项目中,长期的技术自主性和灵活性远比短期的便利性更重要。一个常见的错误是低估了厂商锁定的沉没成本。因此,我们选择方案B。它虽然前期投入更高,但构建的是一个属于我们自己的、可演进的、面向未来的可观测性平台。

核心架构设计与实现

我们的目标是追踪一次完整的用户操作,例如用户在 Flutter App 中搜索一个商品。

数据流转路径:

  1. Flutter客户端: 用户发起搜索。Flutter App 内的 OpenTelemetry SDK 生成一个 Root Span,并创建一个 Trace Context。
  2. API请求: App 通过 HTTP 请求后端 API Gateway。Trace Context (包含 trace_idspan_id) 被注入到 HTTP Header 中(通常是 traceparent)。
  3. 后端服务: Go 编写的后端服务接收到请求,从 Header 中解析出 Trace Context,并创建一个 Child Span,使其与客户端的 Span 关联起来。
  4. 缓存与数据库: 后端服务查询 Redis 缓存和 OpenSearch 数据。每次调用都会创建更深层次的 Span。
  5. 数据采集: 所有 Span 数据通过 OTLP 协议被发送到 OpenTelemetry Collector。
  6. 数据处理与路由: Collector 进行批处理和属性添加。在这里,我们引入 Redis 进行尾部采样决策,仅将包含错误或高延迟的、以及一定比例的正常 Trace 完整发送到 OpenSearch。
  7. 数据存储与可视化: OpenSearch 接收并索引 Trace 数据。Angular 编写的管理后台通过查询 OpenSearch API,将完整的调用链可视化。
graph TD
    subgraph Client
        A[Flutter App] -- OTLP --> E
        B[Angular Admin] -- OTLP --> E
    end

    subgraph Backend on Alibaba Cloud ECS/ACK
        A -- HTTP Request w/ traceparent --> C[API Gateway]
        B -- HTTP Request w/ traceparent --> C
        C --> D[Go Service]
        D -- Redis Query --> F[Redis for Cache]
        D -- Search Query --> G[OpenSearch for Data]
    end

    subgraph Observability Pipeline
        D -- OTLP --> E[OpenTelemetry Collector]
        E -- Sampling Decision --> H[Redis for Sampling]
        E -- Export Traces --> G
    end

    subgraph Visualization
        I[Angular Dashboard] -- Query API --> G
    end

    style G fill:#f9f,stroke:#333,stroke-width:2px

1. OpenTelemetry Collector 关键配置

Collector 是整个管道的中枢。我们将其部署在阿里云 ACK (Kubernetes) 上,以 Deployment 形式运行。它的配置决定了数据的接收、处理和导出方式。

otel-collector-config.yaml:

# /etc/otelcol/config.yaml

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317 # 接收 gRPC 格式的 OTLP 数据
      http:
        endpoint: 0.0.0.0:4318 # 接收 HTTP 格式的 OTLP 数据

processors:
  batch:
    # 批量处理可以显著提高吞吐量,减少对后端的请求次数
    send_batch_size: 8192
    timeout: 1s
  
  memory_limiter:
    # 生产环境必须配置,防止采集器因内存溢出而崩溃
    check_interval: 1s
    limit_mib: 2048 # 限制 collector 使用 2GB 内存
    spike_limit_mib: 512

  # 尾部采样处理器 (需要自定义构建或使用 contrib 版本)
  # 这里以简化的属性处理器为例,真实场景可能是 tail_sampling
  attributes:
    actions:
      - key: deployment.environment
        action: insert
        value: "production"

exporters:
  # 主要导出目标:OpenSearch
  opensearch:
    # 阿里云上的 OpenSearch 实例地址
    endpoints: ["https://es-xxx.elasticsearch.aliyuncs.com:9200"]
    # 使用环境变量传入用户名和密码,更安全
    user: ${OPENSEARCH_USER}
    password: ${OPENSEARCH_PASSWORD}
    # OpenTelemetry 的 traces 数据模型
    traces_index: "otel-v1-traces"
    logs_index: "otel-v1-logs"
    # 生产环境中必须开启,确保数据安全传输
    tls:
      insecure: false 

  # 用于调试,可以将遥测数据打印到控制台
  logging:
    loglevel: debug

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, batch, attributes]
      exporters: [opensearch, logging]
    logs:
      receivers: [otlp]
      processors: [memory_limiter, batch]
      exporters: [opensearch, logging]

生产考量:

  • 上述配置是基础。在真实项目中,processors 部分会更复杂,例如使用 tail_sampling 处理器,它会根据 Trace 的状态(如是否存在错误)、耗时等策略来决定是否采样。
  • tail_sampling 可以与 Redis 结合,实现更复杂的分布式采样决策,避免单点瓶颈。

2. Flutter 客户端深度埋点

在 Flutter 中集成 OpenTelemetry 的挑战在于其异步特性和 UI 框架的封装。我们需要确保 Trace Context 能在 FutureIsolate 之间正确传递。

lib/core/tracing_service.dart:

import 'package:opentelemetry/api.dart' as api;
import 'package:opentelemetry/sdk.dart' as sdk;
import 'package:opentelemetry/exporter.dart';
import 'package:opentelemetry/instrumentation.dart';
import 'package:http/http.dart' as http;

// 这是一个关键的自定义 Client,用于自动注入 traceparent header
class InstrumentedHttpClient extends http.BaseClient {
  InstrumentedHttpClient(this._inner, this._tracer);

  final http.Client _inner;
  final api.Tracer _tracer;

  
  Future<http.StreamedResponse> send(http.BaseRequest request) {
    // 创建一个新的 Span,代表这次 HTTP 调用
    final span = _tracer.startSpan('http.client.${request.method}',
        kind: api.SpanKind.client);

    // 将 trace context 注入到请求头中
    api.Context.current.inject(
        request.headers, B3Propagator(singleHeader: true));

    span.setAttribute(api.SemanticAttributes.httpMethod, request.method);
    span.setAttribute(api.SemanticAttributes.httpUrl, request.url.toString());

    return _inner.send(request).then((response) {
      span.setAttribute(
          api.SemanticAttributes.httpStatusCode, response.statusCode);
      if (response.statusCode >= 400) {
        span.setStatus(api.StatusCode.error,
            description: 'HTTP Error: ${response.statusCode}');
      }
      return response;
    }).catchError((e) {
      span.recordException(e);
      span.setStatus(api.StatusCode.error, description: e.toString());
      throw e;
    }).whenComplete(() {
      span.end();
    });
  }
}


class TracingService {
  static late final sdk.TracerProvider _tracerProvider;
  static late final api.Tracer _tracer;
  static late final http.Client httpClient;

  static Future<void> initialize() async {
    // OTLP gRPC Exporter 配置,指向我们的 Collector
    final collectorExporter = CollectorExporter(
      Uri.parse('http://your-collector-endpoint:4317'),
    );

    // 资源属性,用于标记所有从该 App 发出的遥测数据
    final resources = sdk.Resource([
      api.Attribute.fromString('service.name', 'flutter-app'),
      api.Attribute.fromString('service.version', '1.0.0'),
    ]);

    _tracerProvider = sdk.TracerProvider(
      processors: [sdk.BatchSpanProcessor(collectorExporter)],
      resource: resources,
    );

    // 设置全局 TracerProvider
    api.globalTracerProvider = _tracerProvider;
    _tracer = _tracerProvider.getTracer('com.example.flutter-app');

    // 封装 http.Client
    httpClient = InstrumentedHttpClient(http.Client(), _tracer);
  }

  // 暴露一个方法用于手动创建 Span
  static api.Span startSpan(String name) {
    return _tracer.startSpan(name);
  }
}

// 在 main.dart 中初始化
Future<void> main() async {
  WidgetsFlutterBinding.ensureInitialized();
  await TracingService.initialize(); // 应用启动时初始化
  runApp(MyApp());
}

// 使用示例
void searchProducts(String keyword) {
  final span = TracingService.startSpan('ui.searchProducts');
  span.setAttribute('search.keyword', keyword);

  try {
    // 使用我们封装的 httpClient,它会自动处理 Span 和 Header
    await TracingService.httpClient.post(
      Uri.parse('https://api.example.com/search'),
      body: {'query': keyword},
    );
    span.setStatus(api.StatusCode.ok);
  } catch (e) {
    span.recordException(e);
    span.setStatus(api.StatusCode.error, description: 'Search failed');
  } finally {
    span.end();
  }
}

生产考量:

  • InstrumentedHttpClient 是核心。它保证了所有出站请求都带有正确的追踪上下文。
  • 对于用户交互,比如按钮点击,也应该手动创建 Span (ui.button.click),这样才能将用户操作与后端行为关联起来。
  • 错误处理至关重要。recordException 会将异常堆栈附加到 Span 上,这在排查问题时非常有用。

3. Go 后端服务链路串联

Go 凭借其出色的性能和并发模型,非常适合作为后端服务。在 Go 中集成 OpenTelemetry 同样直接。

main.go:

package main

import (
	"context"
	"log"
	"net/http"
	"time"

	"github.com/go-redis/redis/v8"
	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
)

var (
	tracer    trace.Tracer
	redisClient *redis.Client
)

// 初始化 TracerProvider
func initTracerProvider() (*sdktrace.TracerProvider, error) {
	ctx := context.Background()

	// 配置服务资源信息
	res, err := resource.New(ctx,
		resource.WithAttributes(
			semconv.ServiceName("go-backend-service"),
		),
	)
	if err != nil {
		return nil, err
	}

	// 配置 OTLP Exporter,指向 Collector
	exporter, err := otlptracegrpc.New(ctx,
		otlptracegrpc.WithInsecure(), // 在生产中应使用 WithTLS()
		otlptracegrpc.WithEndpoint("otel-collector.observability:4317"),
	)
	if err != nil {
		return nil, err
	}

	// 创建一个批量处理器,提高性能
	bsp := sdktrace.NewBatchSpanProcessor(exporter)

	tp := sdktrace.NewTracerProvider(
		sdktrace.WithSampler(sdktrace.AlwaysSample()), // 在 Collector 中进行采样决策
		sdktrace.WithResource(res),
		sdktrace.WithSpanProcessor(bsp),
	)
	otel.SetTracerProvider(tp)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))

	return tp, nil
}

func main() {
	tp, err := initTracerProvider()
	if err != nil {
		log.Fatalf("failed to initialize tracer provider: %v", err)
	}
	defer func() {
		if err := tp.Shutdown(context.Background()); err != nil {
			log.Printf("Error shutting down tracer provider: %v", err)
		}
	}()

	tracer = otel.Tracer("main")

	// 初始化 Redis 客户端
	redisClient = redis.NewClient(&redis.Options{Addr: "redis.database:6379"})

	// 使用 otelhttp 中间件包装 http.Handler
	searchHandler := http.HandlerFunc(handleSearch)
	wrappedHandler := otelhttp.NewHandler(searchHandler, "HTTP /search")

	http.Handle("/search", wrappedHandler)
	log.Println("Listening on :8080")
	http.ListenAndServe(":8080", nil)
}

func handleSearch(w http.ResponseWriter, r *http.Request) {
	// otelhttp 中间件会自动从 header 中提取 context 并创建 span
	ctx := r.Context()
	
	// 手动创建子 Span,用于追踪业务逻辑
	_, span := tracer.Start(ctx, "business.logic.search")
	defer span.End()

	query := r.URL.Query().Get("query")
	span.SetAttributes(attribute.String("search.query", query))
	
	// 调用 Redis
	result, err := queryRedis(ctx, "product:"+query)
	if err != nil {
		span.RecordError(err)
		span.SetStatus(codes.Error, "Redis query failed")
		http.Error(w, "internal server error", http.StatusInternalServerError)
		return
	}

	w.Write([]byte(result))
}

// 包装 Redis 调用,为其创建 Span
func queryRedis(ctx context.Context, key string) (string, error) {
	// 获取当前上下文的 tracer
	// 注意:这里的 tracer 是全局的,但 Start 方法会从 ctx 中关联父 Span
	var span trace.Span
	ctx, span = tracer.Start(ctx, "cache.redis.get", trace.WithAttributes(
		semconv.DBSystemRedis,
		attribute.String("db.statement", "GET "+key),
	))
	defer span.End()

	// 模拟 Redis 查询
	time.Sleep(10 * time.Millisecond)
	val, err := redisClient.Get(ctx, key).Result()
	if err == redis.Nil {
		span.SetAttributes(attribute.Bool("cache.hit", false))
		return "", nil // 缓存未命中不是一个错误
	} else if err != nil {
		span.RecordError(err)
		return "", err
	}
	
	span.SetAttributes(attribute.Bool("cache.hit", true))
	return val, nil
}

生产考量:

  • otelhttp.NewHandler 是关键,它实现了服务端的上下文传播,自动将 Flutter 客户端的 Trace 和后端的 Trace 串联起来。
  • 对所有外部依赖(数据库、缓存、第三方API)的调用,都应该用一个新的 Span 包裹起来。这能帮助我们精确定位延迟来源。
  • 采样决策 sdktrace.AlwaysSample() 表明我们信任下游的 Collector 去做最终采样决定,这是一种常见且推荐的模式。

4. Angular 可视化前端

管理后台使用 Angular 构建,其核心任务是提供一个界面,输入 trace_id,然后从 OpenSearch 中拉取并展示整个调用链。

trace.service.ts:

import { Injectable } from '@angular/core';
import { HttpClient, HttpParams } from '@angular/common/http';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

export interface Span {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  name: string;
  startTimeUnixNano: number;
  endTimeUnixNano: number;
  duration: number; // in ms
  serviceName: string;
  attributes: { [key: string]: any };
  status: { code: string; message?: string };
  events: any[];
}

@Injectable({ providedIn: 'root' })
export class TraceService {
  // 阿里云 OpenSearch 实例的查询端点
  private openSearchUrl = 'https://es-xxx.elasticsearch.aliyuncs.com/otel-v1-traces/_search';

  constructor(private http: HttpClient) {}

  getTraceById(traceId: string): Observable<Span[]> {
    const query = {
      size: 1000, // 一个 trace 不太可能有超过1000个span
      query: {
        term: {
          traceId: {
            value: traceId,
          },
        },
      },
      sort: [
        {
          startTimeUnixNano: {
            order: 'asc',
          },
        },
      ],
    };

    // 在生产环境中,应该通过后端服务代理 OpenSearch 的请求以保证安全
    // 这里为简化直接调用,需要处理 CORS 和认证
    return this.http.post<any>(this.openSearchUrl, query).pipe(
      map(response => this.transformHits(response.hits.hits))
    );
  }

  private transformHits(hits: any[]): Span[] {
    if (!hits) {
      return [];
    }
    // 将 OpenSearch 的原始数据转换为我们前端需要的模型
    return hits.map(hit => {
      const source = hit._source;
      return {
        traceId: source.traceId,
        spanId: source.spanId,
        parentSpanId: source.parentSpanId,
        name: source.name,
        startTimeUnixNano: source.startTimeUnixNano,
        endTimeUnixNano: source.endTimeUnixNano,
        duration: (source.endTimeUnixNano - source.startTimeUnixNano) / 1e6, // ns to ms
        serviceName: source.resource.attributes['service.name'],
        attributes: source.attributes,
        status: source.status,
        events: source.events || [],
      };
    });
  }
}

生产考量:

  • 安全: 前端绝不能直接持有访问 OpenSearch 的高权限凭证。正确的做法是,Angular 调用一个后端 BFF (Backend for Frontend) 服务,由 BFF 服务去查询 OpenSearch。
  • 可视化: 将获取到的 Span[] 扁平列表数据,通过递归或迭代,构建成一棵树状结构,然后在界面上用甘特图的形式展示,是业界通用的调用链可视化方案。

架构的局限性与未来演进

这个自建架构虽然灵活且强大,但并非没有缺点。首先,运维 OpenSearch 集群本身就是一个复杂的任务,涉及容量规划、版本升级、快照备份等。在数据量激增时,索引性能和查询优化会成为新的瓶颈。

其次,我们当前实现的采样策略是基于 Collector 的,如果客户端遥测数据量本身就巨大(例如,在一个高日活的 App 中),可能会在网络出口和 Collector 入口造成拥堵。更优的策略是在客户端 SDK 层面实现动态的、基于头部的采样(Head-based Sampling),由一个中心化的配置服务(可以也用 Redis 实现)下发采样率。

未来的演进方向可以包括:

  1. 引入 Metrics 和 Logs: 当前架构只关注 Traces。可以通过在 Collector 中添加 Prometheus exporter,将 Span 数据转换为 Metrics,存入 VictoriaMetrics 或 M3DB 等时序数据库,用于聚合分析和告警。同时,将结构化日志也发送到 OpenSearch,并利用 trace_id 将 Logs 和 Traces 关联起来。
  2. 自动化仪表盘与告警: 利用 Grafana 对接 OpenSearch 和 Prometheus 数据源,创建自动化的业务和技术仪表盘。基于 SLI/SLO 定义告警规则。
  3. 拥抱 eBPF: 对于 ACK 环境中的服务间通信,可以探索使用基于 eBPF 的技术(如 Cilium/Tetragon)来实现零侵入式的网络层可观测性,作为应用层埋点的补充。

  目录