构建基于Consul服务发现与RabbitMQ消息追踪的实时微服务拓扑感知系统


团队接手了一个遗留的向量检索项目,系统由多个Python和Go编写的微服务构成。随着业务迭代,服务间的调用关系变得异常复杂,尤其是在引入了基于RabbitMQ的异步任务处理后。一次对下游ChromaDB embedding服务的常规升级,意外地导致了一个上游看似无关的服务发生连锁性故障。排查过程耗费了整整两天,症结在于我们缺乏一张“活”的架构图,一份能实时反映服务间真实依赖关系的动态拓扑。手动维护的架构文档早已过时,根本无法指导故障排查。

痛点明确:我们需要一个自动化的、实时的服务拓扑感知系统。这个系统必须能够自动发现所有在线服务,并精准绘制出它们之间的同步与异步调用链路。

初步构想与技术选型

这个系统的核心是解决两个问题:节点发现(服务是什么)和边发现(服务间的关系)。

  1. 节点发现 (Node Discovery): 系统中的所有服务已经接入了Consul进行服务注册与发现。这是一个天然、可靠的服务目录源。我们可以直接通过轮询Consul的API来获取当前所有已注册服务实例的完整列表,这些就是我们拓扑图中的“节点”。

  2. 边发现 (Edge Discovery): 这是挑战所在。

    • 同步调用: 对于HTTP这类同步调用,可以通过服务网格(如Istio)或APM工具(如SkyWalking)来捕获。但引入服务网格对现有架构侵入性太强,短期内不现实。
    • 异步调用: 我们的主要痛点是RabbitMQ带来的隐式依赖。一个服务向某个Exchange发布消息,多个服务可能消费它,这种发布-订阅关系在代码层面是解耦的,但在系统层面是强依赖。

最终决定,第一阶段我们专注于解决最棘手的异步调用链路。我们可以设计一个轻量级的消息追踪协议:当一个服务向RabbitMQ发送消息时,在消息的headers中注入自身的service-id。消费者在处理消息时,解析这个header,就明确了消息的来源。然后,消费者将这条“依赖关系”[生产者 -> 消费者]上报给一个中心化的拓扑聚合服务。

这个方案的架构如下:

graph TD
    subgraph "被监控的微服务集群"
        A[api-gateway] -- registers --> C(Consul)
        B[embedding-service] -- registers --> C
        D[ChromaDB] -- used by --> B
        A -- publishes msg w/ trace header --> R(RabbitMQ)
        R -- consumes msg --> B
        B -- reports dependency --> TA(Topology Aggregator)
    end

    subgraph "拓扑感知系统"
        TA -- queries services --> C
        TA -- serves data via WebSocket --> F(Svelte Frontend)
    end

    U[Developer] -- views --> F
  • 拓扑聚合服务 (Topology Aggregator): 一个独立的Go服务。它定时从Consul拉取全量服务列表,同时监听一个专用的RabbitMQ队列,接收来自各个服务的依赖关系上报。它在内存中维护一个完整的拓扑图,并通过WebSocket将图的变更实时推送给前端。选择Go是因为其出色的并发性能和低资源占用,非常适合做这种常驻的后台服务。
  • 前端 (Frontend): 使用Svelte构建。Svelte的编译时响应式机制非常适合构建这种数据驱动的、需要频繁更新的动态可视化界面。我们将使用一个图表库来渲染服务拓扑。

步骤化实现

首先,我们用docker-compose搭建基础环境,这对于本地开发和复现至关重要。

docker-compose.yml

version: '3.8'

services:
  consul:
    image: consul:1.15
    container_name: consul
    ports:
      - "8500:8500"
    command: "agent -server -ui -node=server-1 -bootstrap-expect=1 -client=0.0.0.0"

  rabbitmq:
    image: rabbitmq:3.11-management
    container_name: rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=user
      - RABBITMQ_DEFAULT_PASS=password

  chromadb:
    image: chromadb/chroma:0.4.22
    container_name: chromadb
    ports:
      - "8000:8000"
    # 实际项目中,你需要挂载一个volume来持久化数据
    # volumes:
    #   - chroma_data:/chroma/chroma

  # 我们的被监控服务
  embedding-service:
    build:
      context: ./embedding-service
    container_name: embedding-service
    depends_on:
      - consul
      - rabbitmq
      - chromadb
    environment:
      - SERVICE_NAME=embedding-service
      - CONSUL_HTTP_ADDR=consul:8500
      - RABBITMQ_URL=amqp://user:password@rabbitmq:5672/
      - CHROMA_HOST=chromadb
      - TOPOLOGY_EXCHANGE=topology_events
      - TASK_QUEUE=embedding_tasks
    
  api-gateway:
    build:
      context: ./api-gateway
    container_name: api-gateway
    depends_on:
      - consul
      - rabbitmq
    environment:
      - SERVICE_NAME=api-gateway
      - PORT=8080
      - CONSUL_HTTP_ADDR=consul:8500
      - RABBITMQ_URL=amqp://user:password@rabbitmq:5672/
      - TOPOLOGY_EXCHANGE=topology_events
      - TASK_QUEUE=embedding_tasks

  # 拓扑感知系统的后端
  topology-aggregator:
    build:
      context: ./topology-aggregator
    container_name: topology-aggregator
    ports:
      - "8888:8888" # WebSocket port
    depends_on:
      - consul
      - rabbitmq
    environment:
      - PORT=8888
      - CONSUL_HTTP_ADDR=consul:8500
      - RABBITMQ_URL=amqp://user:password@rabbitmq:5672/
      - TOPOLOGY_EXCHANGE=topology_events
      - TOPOLOGY_QUEUE=topology_reporting

# volumes:
#   chroma_data:

1. 服务端改造:注入与上报依赖

我们需要改造现有服务,让它们在收发消息时具备“追踪”和“上报”能力。这里以embedding-service(Python)为例。

embedding-service/main.py

import os
import uuid
import json
import time
import logging
import pika
import consul
import chromadb
from threading import Thread

# --- 配置初始化 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

SERVICE_NAME = os.getenv('SERVICE_NAME', 'embedding-service')
SERVICE_ID = f"{SERVICE_NAME}-{uuid.uuid4()}"
CONSUL_HOST, CONSUL_PORT = os.getenv('CONSUL_HTTP_ADDR', 'localhost:8500').split(':')
RABBITMQ_URL = os.getenv('RABBITMQ_URL', 'amqp://user:password@localhost:5672/')
CHROMA_HOST = os.getenv('CHROMA_HOST', 'localhost')
TOPOLOGY_EXCHANGE = os.getenv('TOPOLOGY_EXCHANGE', 'topology_events')
TASK_QUEUE = os.getenv('TASK_QUEUE', 'embedding_tasks')

# --- 服务注册到 Consul ---
def register_service():
    c = consul.Consul(host=CONSUL_HOST, port=CONSUL_PORT)
    c.agent.service.register(
        name=SERVICE_NAME,
        service_id=SERVICE_ID,
        address=SERVICE_ID, # 在容器网络中,使用service_id作为逻辑地址
        port=5672, # 对于纯消息队列服务,端口可以是协议端口
        tags=['python', 'rabbitmq-consumer'],
        check=consul.Check.ttl('15s')
    )
    logging.info(f"Service '{SERVICE_NAME}' registered with ID '{SERVICE_ID}'")
    return c

def update_ttl(c, check_id):
    while True:
        try:
            c.agent.check.ttl_pass(check_id, f"Service {SERVICE_ID} is alive")
            time.sleep(10)
        except Exception as e:
            logging.error(f"Failed to update TTL for {SERVICE_ID}: {e}")
            break

# --- RabbitMQ 核心逻辑 ---
def report_dependency(source_service, target_service, channel):
    """
    上报发现的依赖关系
    """
    if not source_service:
        return
    
    try:
        message = {
            "source": source_service,
            "target": target_service,
            "type": "rabbitmq_message",
            "timestamp": int(time.time())
        }
        channel.basic_publish(
            exchange=TOPOLOGY_EXCHANGE,
            routing_key='', # fanout exchange
            body=json.dumps(message),
            properties=pika.BasicProperties(content_type='application/json')
        )
        logging.info(f"Reported dependency: {source_service} -> {target_service}")
    except Exception as e:
        logging.error(f"Failed to report dependency: {e}")


def message_callback(ch, method, properties, body):
    try:
        data = json.loads(body)
        logging.info(f"Received task: {data.get('doc_id')}")

        # 核心:从 header 中解析来源服务
        headers = properties.headers or {}
        source_service = headers.get('x-source-service')

        # 上报依赖
        report_dependency(source_service, SERVICE_NAME, ch)

        # 模拟与 ChromaDB 的交互
        # chroma_client = chromadb.HttpClient(host=CHROMA_HOST, port=8000)
        # collection = chroma_client.get_or_create_collection("docs")
        # collection.add(
        #     embeddings=[[1.1, 2.2, 3.3]], # 伪 embedding
        #     documents=[data.get('content', '')],
        #     ids=[data.get('doc_id')]
        # )
        # logging.info(f"Processed and stored doc {data.get('doc_id')} in ChromaDB")
        time.sleep(1) # 模拟工作负载

        ch.basic_ack(delivery_tag=method.delivery_tag)

    except Exception as e:
        logging.error(f"Error processing message: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

def main():
    # 1. 注册服务
    consul_client = register_service()
    check_id = f"service:{SERVICE_ID}"
    ttl_thread = Thread(target=update_ttl, args=(consul_client, check_id))
    ttl_thread.daemon = True
    ttl_thread.start()

    # 2. 连接 RabbitMQ 并消费
    connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
    channel = connection.channel()

    # 声明拓扑事件 exchange,用于上报
    channel.exchange_declare(exchange=TOPOLOGY_EXCHANGE, exchange_type='fanout')
    
    # 声明任务队列
    channel.queue_declare(queue=TASK_QUEUE, durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue=TASK_QUEUE, on_message_callback=message_callback)

    logging.info("Waiting for messages. To exit press CTRL+C")
    channel.start_consuming()

if __name__ == '__main__':
    main()

api-gateway服务(可以用Go实现)也需要做类似改造,它在发送消息到TASK_QUEUE时,必须注入x-source-service头。

api-gateway/main.go (片段)

// ... RabbitMQ connection setup ...

// 在发布消息时
func publishTask(ch *amqp.Channel, taskQueueName string) {
    // ...
    headers := amqp.Table{
        "x-source-service": "api-gateway", // 注入自身服务名
    }

    err := ch.Publish(
        "",              // exchange
        taskQueueName,   // routing key
        false,           // mandatory
        false,           // immediate
        amqp.Publishing{
            ContentType: "application/json",
            Body:        bodyBytes,
            Headers:     headers,
        })
    // ... error handling
}

2. 拓扑聚合服务 (Go)

这是系统的中枢,它负责数据的收集、处理和分发。

topology-aggregator/main.go

package main

import (
	"encoding/json"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	consulapi "github.com/hashicorp/consul/api"
	"github.com/streadway/amqp"
)

// --- 数据结构定义 ---
type Node struct {
	ID    string   `json:"id"`
	Label string   `json:"label"`
	Tags  []string `json:"tags"`
}

type Edge struct {
	From      string `json:"from"`
	To        string `json:"to"`
	Timestamp int64  `json:"timestamp"`
}

type Topology struct {
	Nodes map[string]Node `json:"nodes"`
	Edges map[string]Edge `json:"edges"`
}

type DependencyReport struct {
	Source    string `json:"source"`
	Target    string `json:"target"`
	Type      string `json:"type"`
	Timestamp int64  `json:"timestamp"`
}

// --- 全局状态 ---
var (
	// 使用sync.Mutex保护对topology的并发访问
	mu         sync.Mutex
	topology   = Topology{Nodes: make(map[string]Node), Edges: make(map[string]Edge)}
	clients    = make(map[*websocket.Conn]bool)
	broadcast  = make(chan Topology)
	upgrader   = websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool { return true },
	}
)

// --- Consul 服务发现 ---
func pollConsulServices(consulClient *consulapi.Client) {
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()

	for {
		<-ticker.C
		services, _, err := consulClient.Catalog().Services(nil)
		if err != nil {
			log.Printf("Error querying Consul services: %v", err)
			continue
		}

		mu.Lock()
		// 标记所有节点为潜在的陈旧节点
		staleNodes := make(map[string]bool)
		for id := range topology.Nodes {
			staleNodes[id] = true
		}

		newNodes := make(map[string]Node)
		for serviceName := range services {
			// 在真实项目中,这里应该处理健康的服务实例,而非直接用服务名
			if _, ok := topology.Nodes[serviceName]; !ok {
				log.Printf("Discovered new service: %s", serviceName)
			}
			newNodes[serviceName] = Node{ID: serviceName, Label: serviceName, Tags: services[serviceName]}
			delete(staleNodes, serviceName) // 移除仍然存在的节点
		}
		
		// 移除已经不存在的服务节点
		nodesChanged := false
		if len(staleNodes) > 0 {
			nodesChanged = true
			for id := range staleNodes {
				log.Printf("Service removed: %s", id)
				delete(topology.Nodes, id)
			}
		}

		if len(newNodes) != len(topology.Nodes) {
			nodesChanged = true
			topology.Nodes = newNodes
		}
		
		mu.Unlock()
		
		if nodesChanged {
			broadcastTopology()
		}
	}
}

// --- RabbitMQ 依赖监听 ---
func consumeDependencyReports(ch *amqp.Channel, queueName string) {
	msgs, err := ch.Consume(
		queueName, // queue
		"",        // consumer
		true,      // auto-ack
		false,     // exclusive
		false,     // no-local
		false,     // no-wait
		nil,       // args
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer: %v", err)
	}

	for d := range msgs {
		var report DependencyReport
		if err := json.Unmarshal(d.Body, &report); err != nil {
			log.Printf("Error decoding dependency report: %v", err)
			continue
		}

		mu.Lock()
		edgeID := report.Source + "->" + report.Target
		// 只有当边不存在或者新上报的时间更新时,才更新并广播
		if existingEdge, ok := topology.Edges[edgeID]; !ok || report.Timestamp > existingEdge.Timestamp {
			log.Printf("Updating edge: %s", edgeID)
			topology.Edges[edgeID] = Edge{
				From:      report.Source,
				To:        report.Target,
				Timestamp: report.Timestamp,
			}
			mu.Unlock()
			broadcastTopology()
		} else {
			mu.Unlock()
		}
	}
}

// --- WebSocket 广播 ---
func broadcastTopology() {
	mu.Lock()
	// 创建一个深拷贝以避免数据竞争
	topoCopy := Topology{
		Nodes: make(map[string]Node),
		Edges: make(map[string]Edge),
	}
	for k, v := range topology.Nodes {
		topoCopy.Nodes[k] = v
	}
	for k, v := range topology.Edges {
		topoCopy.Edges[k] = v
	}
	mu.Unlock()
	
	broadcast <- topoCopy
}

func handleConnections(w http.ResponseWriter, r *http.Request) {
	ws, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer ws.Close()
	clients[ws] = true

	// 客户端连接时,立即发送一次全量拓扑
	mu.Lock()
	initialTopo, _ := json.Marshal(topology)
	mu.Unlock()
	ws.WriteMessage(websocket.TextMessage, initialTopo)

	for {
		// 保持连接打开,实际项目中应有心跳检测
		_, _, err := ws.ReadMessage()
		if err != nil {
			delete(clients, ws)
			break
		}
	}
}

func handleMessages() {
	for {
		topo := <-broadcast
		msg, err := json.Marshal(topo)
		if err != nil {
			log.Printf("Error marshaling topology: %v", err)
			continue
		}
		for client := range clients {
			err := client.WriteMessage(websocket.TextMessage, msg)
			if err != nil {
				log.Printf("Websocket error: %v", err)
				client.Close()
				delete(clients, client)
			}
		}
	}
}

func main() {
	// ... 获取环境变量 ...
	consulAddr := os.Getenv("CONSUL_HTTP_ADDR")
	rabbitmqURL := os.Getenv("RABBITMQ_URL")
	topologyExchange := os.Getenv("TOPOLOGY_EXCHANGE")
	topologyQueue := os.Getenv("TOPOLOGY_QUEUE")

	// 初始化Consul客户端
	consulConfig := consulapi.DefaultConfig()
	consulConfig.Address = consulAddr
	consulClient, err := consulapi.NewClient(consulConfig)
	if err != nil {
		log.Fatalf("Failed to create Consul client: %v", err)
	}
	go pollConsulServices(consulClient)

	// 初始化RabbitMQ连接
	conn, err := amqp.Dial(rabbitmqURL)
	// ... 错误处理 ...
	defer conn.Close()
	ch, err := conn.Channel()
	// ... 错误处理 ...
	defer ch.Close()

	err = ch.ExchangeDeclare(topologyExchange, "fanout", true, false, false, false, nil)
	// ...
	q, err := ch.QueueDeclare(topologyQueue, true, false, false, false, nil)
	// ...
	err = ch.QueueBind(q.Name, "", topologyExchange, false, nil)
	// ...
	go consumeDependencyReports(ch, q.Name)

	// 启动WebSocket服务器
	http.HandleFunc("/ws", handleConnections)
	go handleMessages()

	log.Println("Topology aggregator started on :8888")
	err = http.ListenAndServe(":8888", nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}

3. 前端可视化 (Svelte)

前端负责从WebSocket接收拓扑数据,并使用vis-network库进行渲染。

frontend/src/App.svelte

<script>
  import { onMount } from 'svelte';
  import { DataSet } from 'vis-data/peer';
  import { Network } from 'vis-network/peer';
  import 'vis-network/styles/vis-network.css';

  let container;
  let network;
  let status = 'Connecting...';

  // vis-network需要的数据结构
  const nodes = new DataSet([]);
  const edges = new DataSet([]);

  onMount(() => {
    const options = {
      nodes: {
        shape: 'dot',
        size: 20,
        font: {
          size: 14,
          color: '#ffffff'
        },
        borderWidth: 2
      },
      edges: {
        width: 2,
        arrows: 'to'
      },
      physics: {
        enabled: true,
        barnesHut: {
          gravitationalConstant: -10000,
          springConstant: 0.04,
          springLength: 150
        }
      },
      layout: {
        improvedLayout: false
      }
    };
    
    network = new Network(container, { nodes, edges }, options);

    const ws = new WebSocket('ws://localhost:8888/ws');

    ws.onopen = () => {
      status = 'Connected';
    };

    ws.onmessage = (event) => {
      const topology = JSON.parse(event.data);
      updateGraph(topology);
    };

    ws.onclose = () => {
      status = 'Disconnected. Attempting to reconnect...';
      // 在生产环境中,这里应该有重连逻辑
    };

    ws.onerror = (error) => {
      status = `Error: ${error.message}`;
    };
  });

  function updateGraph(topology) {
    status = `Connected. Last update: ${new Date().toLocaleTimeString()}`;

    // --- 节点更新 ---
    const newNodes = Object.values(topology.nodes || {});
    const existingNodeIds = nodes.getIds();
    const newNodeIds = newNodes.map(n => n.id);

    // 添加或更新节点
    nodes.update(newNodes);

    // 移除不存在的节点
    const nodesToRemove = existingNodeIds.filter(id => !newNodeIds.includes(id));
    if (nodesToRemove.length > 0) {
      nodes.remove(nodesToRemove);
    }

    // --- 边更新 ---
    const newEdges = Object.values(topology.edges || []);
    const existingEdgeIds = edges.getIds();
    const newEdgeIds = newEdges.map(e => `${e.from}->${e.to}`);
    
    // vis-network的edge需要一个id
    const edgesWithIds = newEdges.map(e => ({
        id: `${e.from}->${e.to}`,
        from: e.from,
        to: e.to,
    }));
    edges.update(edgesWithIds);
    
    const edgesToRemove = existingEdgeIds.filter(id => !newEdgeIds.includes(id));
    if (edgesToRemove.length > 0) {
        edges.remove(edgesToRemove);
    }
  }

</script>

<main>
  <h1>Real-time Microservice Topology</h1>
  <div class="status">{status}</div>
  <div class="network-container" bind:this={container}></div>
</main>

<style>
  main {
    font-family: sans-serif;
    text-align: center;
    height: 100vh;
    display: flex;
    flex-direction: column;
  }
  .status {
    padding: 8px;
    background-color: #333;
    color: white;
  }
  .network-container {
    flex-grow: 1;
    border: 1px solid #ccc;
    background-color: #222;
  }
</style>

遗留问题与未来迭代

这个系统第一版解决了核心痛点,但距离生产级可用还有距离:

  1. 依赖发现的局限性: 目前仅覆盖了通过RabbitMQ的异步调用。对于服务间的直接HTTP API调用、数据库访问等依赖关系尚未覆盖。下一步计划是引入OpenTelemetry,通过其自动化的instrumentation能力来捕获更全面的调用链路,并将trace数据导出到我们的聚合服务中。

  2. 拓扑数据持久化: 当前拓扑图完全存储在聚合服务的内存中,服务重启后会丢失历史依赖关系。可以考虑将拓扑数据(特别是边)持久化到时序数据库(如Prometheus或M3DB),这样不仅能防止数据丢失,还能分析依赖关系随时间的变化。

  3. 边的权重与状态: 目前的“边”仅表示存在依赖。一个更有价值的拓扑图应该展示边的健康度、流量(QPS)、延迟等信息。这同样需要与监控系统(如Prometheus)或分布式追踪系统进行更深度的集成。

  4. 前端交互增强: 当前前端只能展示。后续可以增加节点点击、显示服务元数据(实例列表、tags、Consul健康检查状态)、高亮特定链路、按服务类型过滤等高级交互功能,使其成为一个真正强大的运维工具。

尽管存在上述局限,这个自研的轻量级系统以极低的侵入性成本,为我们团队提供了一个动态、实时的微服务“作战地图”,显著提升了我们对系统复杂度的理解和故障响应速度。


  目录