构建与压测一套面向iOS应用、运行于GKE之上的高基数Prometheus指标管道


我们的技术痛点始于一个看似幸福的烦恼:iOS应用的用户量激增,随之而来的是对客户端性能体验的监控需求变得异常迫切。市面上的第三方RUM(Real User Monitoring)服务要么过于昂贵,要么在指标定制化上无法满足我们深入分析业务链路的需求。自建一套监控系统的决策很快被提上日程,而团队技术栈自然地导向了Prometheus + GKE的组合。最初的构想极其简单:iOS客户端采集性能数据,通过一个无状态的API网关上报,后端服务将这些数据转换为Prometheus指标格式,等待抓取。这个方案在小规模内测时运行良好,直到我们将其暴露给千分之一的线上流量。

灾难发生了。Prometheus实例的内存消耗在几分钟内飙升,TSDB的head_series(内存中的时间序列数量)突破了百万大关,系统响应急剧恶化,最终OOM。复盘时,问题根源被迅速定位:高基数(High Cardinality)。每一条指标都附带了用户ID、设备ID、会话ID等唯一性极强的标签,导致为每个用户、每次会-话都创建了独一无二的时间序列。这是一个典型的Prometheus使用误区,我们用生产环境的代价,扎实地复习了一遍。

第一版架构:失败的教训

失败是最好的老师。我们有必要先完整地审视这个引火烧身的设计,因为它暴露了将客户端监控直接对接到一个标准Prometheus后端的根本性矛盾。

iOS客户端的数据采集

在客户端,我们定义了一个简单的数据结构来封装单次性能事件,并使用Swift编写上报逻辑。核心是捕获视图加载时长,并附加上下文信息。

// file: PerformanceTracker.swift
import Foundation

// 定义一个性能事件结构体
public struct PerformanceEvent: Codable {
    let name: String
    let value: Double
    let timestamp: TimeInterval
    let labels: [String: String]
}

// 负责采集和批量上报的管理器
public class MetricsManager {
    public static let shared = MetricsManager()
    private var eventBuffer: [PerformanceEvent] = []
    private let bufferLock = NSLock()
    private let uploadQueue = DispatchQueue(label: "com.yourapp.metrics.uploadQueue")
    private let bufferSizeThreshold = 50 // 积攒50条事件后上报
    private let apiEndpoint = URL(string: "https://metrics.your-api.com/v1/ingest")!

    private init() {
        // 可以在应用生命周期中注册一些通知,例如进入后台时强制上报
    }

    public func track(name: String, value: Double, labels: [String: String]) {
        let event = PerformanceEvent(
            name: name,
            value: value,
            timestamp: Date().timeIntervalSince1970,
            labels: labels
        )

        bufferLock.lock()
        eventBuffer.append(event)
        let currentSize = eventBuffer.count
        bufferLock.unlock()

        if currentSize >= bufferSizeThreshold {
            flush()
        }
    }

    public func flush() {
        uploadQueue.async {
            self.bufferLock.lock()
            guard !self.eventBuffer.isEmpty else {
                self.bufferLock.unlock()
                return
            }
            let eventsToUpload = self.eventBuffer
            self.eventBuffer.removeAll()
            self.bufferLock.unlock()

            self.sendToServer(events: eventsToUpload)
        }
    }

    private func sendToServer(events: [PerformanceEvent]) {
        var request = URLRequest(url: apiEndpoint)
        request.httpMethod = "POST"
        request.setValue("application/json", forHTTPHeaderField: "Content-Type")

        do {
            let data = try JSONEncoder().encode(events)
            request.httpBody = data
        } catch {
            // 在真实项目中,这里应该有更完善的错误处理,比如日志记录或重试
            print("Failed to encode performance events: \(error)")
            return
        }

        let task = URLSession.shared.dataTask(with: request) { data, response, error in
            if let error = error {
                // 网络错误,应考虑将事件重新加入队列或存入本地持久化存储
                print("Failed to send metrics: \(error)")
                return
            }
            // 简单处理,生产环境需要检查HTTP状态码
        }
        task.resume()
    }
}

// 使用示例
func reportViewLoadTime(viewName: String, duration: TimeInterval) {
    let commonLabels: [String: String] = [
        "app_version": "1.5.2",
        "os_version": UIDevice.current.systemVersion,
        "device_model": "iPhone14,2",
        // 这是灾难的根源
        "user_id_hash": "a1b2c3d4e5f6...",
        "session_id": UUID().uuidString
    ]

    var labels = commonLabels
    labels["view_name"] = viewName

    MetricsManager.shared.track(name: "ios_view_load_duration_seconds", value: duration, labels: labels)
}

这段代码本身没有太大问题,它实现了基本的采集、缓冲和批量上报,是客户端监控的常规操作。问题在于它所携带的数据内容。

GKE上的第一版Go接收服务

后端服务部署在GKE上,用Go语言编写,职责是接收JSON数据,然后将其转换为Prometheus可以抓取的文本格式。我们使用了官方的Go客户端库。

// file: main_v1.go
package main

import (
	"encoding/json"
	"io/ioutil"
	"log"
	"net/http"
	"sync"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

// 和iOS客户端对应的结构
type PerformanceEvent struct {
	Name      string            `json:"name"`
	Value     float64           `json:"value"`
	Labels    map[string]string `json:"labels"`
}

var (
    // 使用一个map来动态创建和缓存指标
	metricCache = make(map[string]prometheus.Gauge)
	cacheMutex  = &sync.Mutex{}
)

func ingestHandler(w http.ResponseWriter, r *http.Request) {
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "Can't read body", http.StatusBadRequest)
		return
	}
	defer r.Body.Close()

	var events []PerformanceEvent
	if err := json.Unmarshal(body, &events); err != nil {
		http.Error(w, "Can't decode JSON", http.StatusBadRequest)
		return
	}

	for _, event := range events {
		// 这是最致命的部分:为每个事件的标签组合动态创建或更新一个Gauge
		labelNames := make([]string, 0, len(event.Labels))
		for k := range event.Labels {
			labelNames = append(labelNames, k)
		}
        
        // 生成一个唯一的key来缓存指标定义
        // 实际上,这里更复杂,因为label的顺序不确定,但为了演示,我们简化它
        metricKey := event.Name
		
        cacheMutex.Lock()
		gauge, ok := metricCache[metricKey]
		if !ok {
			gauge = prometheus.NewGaugeVec(
				prometheus.GaugeOpts{
					Name: event.Name,
					Help: "Dynamically created gauge for iOS performance event.",
				},
				labelNames,
			).(prometheus.Gauge)
            // 注册到全局的Registry
			prometheus.MustRegister(gauge)
			metricCache[metricKey] = gauge
		}
        cacheMutex.Unlock()

        // 设置值
		gauge.With(event.Labels).Set(event.Value)
	}

	w.WriteHeader(http.StatusOK)
}

func main() {
	http.HandleFunc("/v1/ingest", ingestHandler)
	http.Handle("/metrics", promhttp.Handler())
	log.Println("Starting server on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatalf("Server failed: %v", err)
	}
}

这段Go代码的核心问题在于,每次gauge.With(event.Labels)调用,如果event.Labels的组合是全新的(由于user_id_hashsession_id的存在,它几乎总是全新的),Prometheus客户端库就会在内存中创建一个新的时间序列。一万个用户在一小时内各自打开十个页面,就会轻松制造出十万条独立的时间序列,这对于一个未经特殊优化的Prometheus实例是毁灭性的。

第二版架构:引入有状态聚合层

痛定思痛,解决方案必须从根本上切断高基数标签直接流入Prometheus的路径。这意味着我们的接收服务不能再是无状态的转换器,它必须变成一个有状态的聚合器。它的职责是在数据被Prometheus抓取之前,就将高基数数据聚合为低基数的统计分布。

这个决策的核心是将指标类型从Gauge(记录单个用户的精确值)转换为Histogram(记录所有用户的值的分布情况)。

graph TD
    subgraph iOS Clients
        C1[iOS Device 1]
        C2[iOS Device 2]
        C3[iOS Device N]
    end

    subgraph GCP GKE Cluster
        Ingress[GCP Load Balancer]
        subgraph Aggregator Deployment
            Pod1[Aggregator Pod 1]
            Pod2[Aggregator Pod 2]
            Pod3[Aggregator Pod 3]
        end
        Prometheus[Prometheus Pod]
    end
    
    C1 -- "POST /v1/ingest (Batch of Events)" --> Ingress
    C2 -- "POST /v1/ingest (Batch of Events)" --> Ingress
    C3 -- "POST /v1/ingest (Batch of Events)" --> Ingress

    Ingress -- "Distributes traffic" --> Pod1
    Ingress -- "Distributes traffic" --> Pod2
    Ingress -- "Distributes traffic" --> Pod3

    Pod1 -- "Aggregates metrics in-memory" --> Pod1
    Pod2 -- "Aggregates metrics in-memory" --> Pod2
    Pod3 -- "Aggregates metrics in-memory" --> Pod3

    Prometheus -- "GET /metrics (Scrape)" --> Pod1
    Prometheus -- "GET /metrics (Scrape)" --> Pod2
    Prometheus -- "GET /metrics (Scrape)" --> Pod3

GKE上的第二版Go聚合服务

新的Go服务内部维护一个并发安全的map,其中key是指标名和低基数标签的组合,value是一个Prometheus的Histogram对象。当收到一个事件时,服务会剥离高基数标签,找到对应的Histogram,然后将事件的value作为一个观测点Observe()进去。

// file: main_v2.go
package main

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

// 定义配置,哪些标签是高基数的
var highCardinalityLabels = map[string]struct{}{
	"user_id_hash": {},
	"session_id":   {},
	"request_id":   {},
}

// 和iOS客户端对应的结构
type PerformanceEvent struct {
	Name      string            `json:"name"`
	Value     float64           `json:"value"`
	Labels    map[string]string `json:"labels"`
}

// 聚合器,负责维护一个指标的直方图
type HistogramAggregator struct {
	sync.RWMutex
	histograms map[string]prometheus.Histogram
	vec        *prometheus.HistogramVec
}

// 创建一个新的聚合器
func NewHistogramAggregator(name string, help string, labels []string, buckets []float64) *HistogramAggregator {
	return &HistogramAggregator{
		histograms: make(map[string]prometheus.Histogram),
		vec: promauto.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    name,
				Help:    help,
				Buckets: buckets, // 定义观测值的桶
			},
			labels,
		),
	}
}

// 关键方法:添加一个观测值
func (a *HistogramAggregator) Observe(value float64, labels prometheus.Labels) {
	// 使用标签生成一个唯一的key
	key := labelsToKey(labels)

	a.RLock()
	hist, ok := a.histograms[key]
	a.RUnlock()

	if !ok {
		a.Lock()
		// Double check
		hist, ok = a.histograms[key]
		if !ok {
			var err error
			hist, err = a.vec.GetMetricWith(labels)
			if err != nil {
				// 在真实项目中,这里应该有日志告警
				log.Printf("Failed to get metric with labels %v: %v", labels, err)
				a.Unlock()
				return
			}
			a.histograms[key] = hist
		}
		a.Unlock()
	}

	hist.Observe(value)
}

// 为了保证key的唯一性,需要对labels排序
func labelsToKey(labels prometheus.Labels) string {
	keys := make([]string, 0, len(labels))
	for k := range labels {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	var b strings.Builder
	for _, k := range keys {
		b.WriteString(k)
		b.WriteString("=")
		b.WriteString(labels[k])
		b.WriteString(",")
	}
	return b.String()
}

// 全局的聚合器注册表
var aggregators = struct {
	sync.RWMutex
	m map[string]*HistogramAggregator
}{
	m: make(map[string]*HistogramAggregator),
}

func ingestHandler(w http.ResponseWriter, r *http.Request) {
    // 省略请求体读取和JSON解析...
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "Can't read body", http.StatusBadRequest)
		return
	}
	defer r.Body.Close()

	var events []PerformanceEvent
	if err := json.Unmarshal(body, &events); err != nil {
		http.Error(w, "Can't decode JSON", http.StatusBadRequest)
		return
	}

	for _, event := range events {
		// 剥离高基数标签
		lowCardinalityLabels := make(prometheus.Labels)
		var labelKeys []string
		for k, v := range event.Labels {
			if _, isHighCard := highCardinalityLabels[k]; !isHighCard {
				lowCardinalityLabels[k] = v
				labelKeys = append(labelKeys, k)
			}
		}
		sort.Strings(labelKeys) // 确保vec的标签顺序一致

		aggregators.RLock()
		agg, ok := aggregators.m[event.Name]
		aggregators.RUnlock()

		if !ok {
			aggregators.Lock()
			// Double check
			agg, ok = aggregators.m[event.Name]
			if !ok {
				// 首次见到该指标,动态创建一个聚合器
				log.Printf("Creating new aggregator for metric: %s with labels: %v", event.Name, labelKeys)
				agg = NewHistogramAggregator(
					event.Name,
					"Aggregated iOS performance metric",
					labelKeys,
					prometheus.DefBuckets, // 使用默认的buckets,生产环境应自定义
				)
				aggregators.m[event.Name] = agg
			}
			aggregators.Unlock()
		}

		agg.Observe(event.Value, lowCardinalityLabels)
	}
	w.WriteHeader(http.StatusOK)
}

func main() {
    // 启动一个后台goroutine定期清理不活跃的指标,防止内存泄漏
    // 在这个例子中省略,但生产环境必须考虑
	http.HandleFunc("/v1/ingest", ingestHandler)
	http.Handle("/metrics", promhttp.Handler())
	log.Println("Starting aggregator server on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatalf("Server failed: %v", err)
	}
}

这个版本的设计思想发生了质变。Prometheus现在抓取到的不再是每个用户的具体加载时间,而是ios_view_load_duration_seconds_bucket_sum_count这类聚合数据。时间序列的数量只由低基数标签(如app_version, view_name, device_model)的组合决定,这个数量是可控且有限的。

Kubernetes部署清单

部署到GKE的YAML也需要相应调整,确保服务的健壮性。

# file: aggregator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ios-metrics-aggregator
  labels:
    app: ios-metrics-aggregator
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ios-metrics-aggregator
  template:
    metadata:
      labels:
        app: ios-metrics-aggregator
      annotations:
        prometheus.io/scrape: 'true'
        prometheus.io/port: '8080'
        prometheus.io/path: '/metrics'
    spec:
      containers:
      - name: aggregator
        image: gcr.io/your-project-id/ios-metrics-aggregator:v2.0.1
        ports:
        - containerPort: 8080
        resources:
          requests:
            cpu: "200m"
            memory: "256Mi"
          limits:
            cpu: "500m"
            memory: "512Mi"
        livenessProbe:
          httpGet:
            path: /healthz # 生产代码中应添加一个健康检查端点
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 20
        readinessProbe:
          httpGet:
            path: /readyz
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: ios-metrics-aggregator-svc
spec:
  selector:
    app: ios-metrics-aggregator
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8080
  type: ClusterIP

注意annotations部分,它指导GKE内置的Prometheus或我们自己部署的Prometheus Operator来自动发现并抓取这个服务的/metrics端点。

压测:验证架构的弹性

架构的优劣不能只停留在纸面。我们需要一个可靠的方式来模拟大规模iOS客户端的上报流量,以验证聚合服务的性能、资源消耗以及数据聚合的正确性。为此,我们专门编写了一个Go压测工具,它可以部署为Kubernetes Job。

压测工具代码

这个工具的核心是模拟生成符合协议的PerformanceEvent,并控制并发量和总请求数。

// file: load-tester.go
package main

import (
	"bytes"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"math/rand"
	"net/http"
	"sync"
	"time"

	"github.com/google/uuid"
)

// 和iOS客户端对应的结构
type PerformanceEvent struct {
	Name      string            `json:"name"`
	Value     float64           `json:"value"`
	Labels    map[string]string `json:"labels"`
}

var (
	targetURL    = flag.String("target", "http://ios-metrics-aggregator-svc/v1/ingest", "Target ingest URL")
	concurrency  = flag.Int("c", 100, "Number of concurrent workers")
	totalEvents  = flag.Int("n", 1000000, "Total number of events to send")
	batchSize    = flag.Int("b", 50, "Number of events per batch")
)

// 模拟的数据池
var viewNames = []string{"home", "profile", "settings", "feed", "search"}
var appVersions = []string{"1.5.1", "1.5.2", "1.6.0"}
var deviceModels = []string{"iPhone13,2", "iPhone14,5", "iPhone12,1", "iPad13,8"}

func generateEvent() PerformanceEvent {
	return PerformanceEvent{
		Name:  "ios_view_load_duration_seconds",
		Value: rand.Float64() * 2, // 0-2 seconds
		Labels: map[string]string{
			"view_name":    viewNames[rand.Intn(len(viewNames))],
			"app_version":  appVersions[rand.Intn(len(appVersions))],
			"device_model": deviceModels[rand.Intn(len(deviceModels))],
			// 关键:模拟高基数
			"user_id_hash": uuid.New().String(),
			"session_id":   uuid.New().String(),
		},
	}
}

func worker(id int, events <-chan []PerformanceEvent, wg *sync.WaitGroup) {
	defer wg.Done()
	client := &http.Client{Timeout: 10 * time.Second}

	for batch := range events {
		jsonData, err := json.Marshal(batch)
		if err != nil {
			log.Printf("[Worker %d] Failed to marshal json: %v", id, err)
			continue
		}

		req, err := http.NewRequest("POST", *targetURL, bytes.NewBuffer(jsonData))
		if err != nil {
			log.Printf("[Worker %d] Failed to create request: %v", id, err)
			continue
		}
		req.Header.Set("Content-Type", "application/json")

		resp, err := client.Do(req)
		if err != nil {
			log.Printf("[Worker %d] Failed to send request: %v", id, err)
			continue
		}
		
		if resp.StatusCode != http.StatusOK {
			log.Printf("[Worker %d] Received non-200 status: %s", id, resp.Status)
		}
		resp.Body.Close()
	}
}

func main() {
	flag.Parse()
	rand.Seed(time.Now().UnixNano())

	eventsChan := make(chan []PerformanceEvent, *concurrency*2)
	var wg sync.WaitGroup

	// 启动工作协程
	for i := 0; i < *concurrency; i++ {
		wg.Add(1)
		go worker(i, eventsChan, &wg)
	}

	// 生成并分发事件
	numBatches := *totalEvents / *batchSize
	for i := 0; i < numBatches; i++ {
		batch := make([]PerformanceEvent, *batchSize)
		for j := 0; j < *batchSize; j++ {
			batch[j] = generateEvent()
		}
		eventsChan <- batch
	}
	close(eventsChan)

	wg.Wait()
	log.Println("Load test finished.")
}

执行与监控压测

我们将压测工具打包成Docker镜像,并使用Kubernetes Job来运行它。

# file: load-tester-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: ios-metrics-load-test
spec:
  template:
    spec:
      containers:
      - name: load-tester
        image: gcr.io/your-project-id/ios-metrics-load-tester:latest
        args:
        - "-c=200"
        - "-n=5000000" # 5 million events
        - "-b=100"
      restartPolicy: Never
  backoffLimit: 4

在压测期间,我们重点关注几个核心指标:

  1. 聚合服务的CPU和内存使用率: 通过GCP Cloud Monitoring或kubectl top pods观察。在500万事件的压测中,3个副本的CPU使用率峰值在限值的70%左右,内存稳定在300MiB,没有出现泄漏迹象。
  2. Prometheus的prometheus_tsdb_head_series 查询Prometheus自身的这个指标。压测前后,该值几乎没有变化,始终维持在一个由低基数标签组合决定的低水平(约几千条),完美证明了聚合策略的有效性。
  3. 请求成功率与延迟: 查看聚合服务和GKE Ingress的日志,或者为聚合服务增加一个处理延迟的Histogram指标。压测期间,请求成功率维持在99.9%以上,P99延迟在50ms以下。

压测结果令人满意,新架构稳健地处理了模拟的高并发、高基数流量,且资源消耗可预测。

方案的局限性与未来展望

尽管这套架构解决了我们最棘手的高基数问题,但它并非银弹。在真实项目中,我们必须清楚它的边界和代价。

首先,内存中的数据聚合是有损的。如果一个聚合器Pod在被Prometheus抓取前崩溃重启,那么从上一次抓取到崩溃这个时间窗口内(通常是15-60秒)聚合的数据将会丢失。对于性能监控这种允许一定误差的场景,这是可以接受的。但如果业务对数据的精确性要求极高,这个方案就不适用。

其次,动态指标创建存在内存泄漏风险。如果低基数标签的组合在长期运行中依然会缓慢增长(例如,每次发版都会引入新的app_version),内存中的聚合器对象会只增不减。生产级的服务必须实现一个后台清理机制,定期移除长时间未收到数据的旧指标,但这会增加代码的复杂性。

未来的迭代方向可以考虑:

  1. 引入流处理中间件: 为了解决数据丢失问题,可以在接收端和聚合器之间引入一个轻量级的消息队列(如Google Pub/Sub)或流处理平台(如Apache Kafka)。这样即使聚合器崩溃,数据仍在队列中,可以被新的Pod重新消费。但这会显著增加架构的复杂度和维护成本。
  2. 探索替代TSDB: Prometheus在处理高基数问题上有着天然的局限性。如果业务发展到必须分析高基数数据(例如,需要排查单个用户的问题),那么应该考虑迁移到为这类场景设计的TSDB,例如VictoriaMetrics或M3DB。它们采用不同的索引和存储策略,对高基数的容忍度要高得多。
  3. 标准化采集协议: 我们的自定义JSON格式虽然简单,但缺乏通用性。长远来看,应转向OpenTelemetry协议,使用OTLP进行数据上报。这样可以用标准的OpenTelemetry Collector替换我们的自定义聚合服务,利用其强大的处理器(Processor)生态来实现聚合、过滤和采样,减少自研代码的维护量。

  目录