构建基于DigitalOcean动态节点的分布式压测平台并集成Nacos实现实时配置


我们团队之前的压力测试流程非常原始:在一台预置的服务器上手动运行wrkJMeter,调整参数需要停止测试、修改脚本、然后重新启动。这种方式不仅效率低下,而且无法模拟真实世界中流量的动态变化,更无法在测试进行中调整策略。当需要模拟更大规模的并发时,就得手动去申请和配置更多机器,整个过程耗时耗力,且结果难以复现。痛点很明确:我们需要一个能够按需、动态、自动化地创建和销毁压测节点,并且能在测试运行时实时调整压测参数的平台。

初步构想与架构设计

目标是构建一个由中心控制平面(Control Plane)和多个动态压测节点(Agent)组成的分布式系统。

  • 控制平面: 负责接收用户指令(开始/停止测试),与云平台API交互以创建和销毁Agent节点,作为配置中心(Nacos)的客户端实时获取并分发压测配置,同时聚合来自所有Agent的时序性能指标,并通过WebSocket推送给前端。
  • 压测Agent: 无状态的执行单元。启动后,从控制平面获取压测任务配置,对目标服务发起负载,并定期将性能指标(RPS, 延迟分布, 错误率等)上报给控制平面。
  • 配置中心: 使用Nacos。核心价值在于其动态配置推送能力,允许我们在不重启任何服务的情况下,修改压测的目标URL、并发数、持续时间等关键参数。
  • 前端: 一个简单的仪表盘,用于触发测试和实时可视化展示时序性能数据。

整个系统的交互流程可以用下面的图来描述:

sequenceDiagram
    participant FE as 前端仪表盘
    participant CP as 控制平面 (Python FastAPI)
    participant Nacos
    participant DO as DigitalOcean API
    participant Agents as 压测节点 (Droplets)
    participant Target as 目标服务

    FE->>+CP: 发起/停止压测 (HTTP API)
    CP->>+Nacos: 订阅压测配置
    Nacos-->>-CP: 推送初始/变更配置
    CP->>+DO: 创建/销毁Droplets (Agents)
    DO-->>-CP: 返回Droplet状态
    
    Agents->>+CP: 启动后注册并请求配置
    CP-->>-Agents: 分发压测配置
    
    loop 压测执行循环
        Agents->>+Target: 发起HTTP请求
        Target-->>-Agents: 返回响应
        Agents->>+CP: 上报时序指标 (HTTP)
    end

    CP-->>FE: 实时推送聚合指标 (WebSocket)

技术选型与理由

  1. 控制平面: 选择Python FastAPI。它的异步特性非常适合处理I/O密集型任务,如同时管理多个Agent的连接、与DigitalOcean API通信以及处理WebSocket流。开发效率高,性能也足以胜任控制平面的角色。
  2. 云平台: DigitalOcean。关键在于它提供了非常简洁清晰的RESTful API和成熟的Python SDK (python-digitalocean),可以轻松地用代码创建、查询和销毁Droplets(虚拟机)。相比AWS的复杂性,DO在快速原型和中小型项目上更具优势。
  3. 压测Agent逻辑: 直接使用Python的httpx库。它支持异步请求,性能优异,并且可以精确控制并发。虽然Locust是更专业的压测框架,但为了保持Agent的轻量和自定义能力,我们从httpx开始构建。
  4. 配置中心: Nacos。在真实项目中,配置的动态性至关重要。Nacos不仅能存储配置,其核心能力是“推送”,当配置变更时,服务端会主动通知客户端。这正是我们实现“运行时调参”的关键。相比之下,把配置写在代码或环境变量里完全无法满足需求。
  5. 前端: JavaScript (React) 配合 ECharts 和 原生WebSocket API。React用于构建组件化UI,ECharts负责将复杂的时序数据转化为直观的图表,WebSocket则提供了与后端进行低延迟、双向通信的通道,是实现实时监控的基石。

核心实现:控制平面

控制平面是整个系统的大脑。它的代码需要处理几件核心事务:基础设施管理、配置管理和数据聚合。

1. 基础设施管理 (DigitalOcean API)

我们需要一个服务类来封装与DigitalOcean的交互。这里的代码必须是生产级的,包含错误处理和状态管理。

services/droplet_manager.py:

import os
import time
import logging
import digitalocean

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class DropletManager:
    """
    封装 DigitalOcean Droplet 的创建、销毁和状态查询逻辑。
    在真实项目中,API Token 绝不能硬编码,必须通过环境变量或安全的密钥管理服务获取。
    """
    def __init__(self, tag: str = "pressure-agent"):
        self.token = os.getenv("DIGITALOCEAN_TOKEN")
        if not self.token:
            raise ValueError("DIGITALOCEAN_TOKEN 环境变量未设置")
        self.manager = digitalocean.Manager(token=self.token)
        self.tag = tag # 使用标签来管理所有 agent 节点

    def create_agents(self, count: int, ssh_key_fingerprint: str) -> list[digitalocean.Droplet]:
        """
        批量创建 agent Droplets。
        user_data 用于在 Droplet 启动时自动执行脚本,例如安装依赖、启动 agent 程序。
        """
        logging.info(f"准备创建 {count} 个 agent Droplets...")
        
        # 这里的 user_data 是一个启动脚本,它会下载并运行 agent 代码
        # 在生产环境中,更稳妥的方式是构建一个包含所有依赖的自定义镜像 (Snapshot)
        user_data_script = """#!/bin/bash
        apt-get update -y
        apt-get install -y python3 python3-pip git
        git clone https://your-repo/agent.git /opt/agent
        pip3 install -r /opt/agent/requirements.txt
        # 启动 agent,并将控制平面地址作为参数传入
        export CONTROL_PLANE_URL='http://YOUR_CONTROL_PLANE_IP:8000'
        python3 /opt/agent/agent.py
        """

        droplets = []
        names = [f"pressure-agent-{self.tag}-{int(time.time())}-{i}" for i in range(count)]
        
        try:
            for name in names:
                droplet = digitalocean.Droplet(
                    token=self.token,
                    name=name,
                    region='nyc3', # 选择合适的区域
                    image='ubuntu-22-04-x64', # 基础镜像
                    size_slug='s-1vcpu-1gb', # 最小规格用于演示
                    ssh_keys=[ssh_key_fingerprint],
                    user_data=user_data_script,
                    tags=[self.tag]
                )
                droplets.append(droplet)
            
            # DigitalOcean API 不直接支持批量创建,但我们可以并发请求
            # 这里为简化,使用串行创建
            created_droplets = []
            for d in droplets:
                d.create()
                logging.info(f"Droplet {d.name} 创建请求已发送, ID: {d.id}")
                created_droplets.append(d)
            
            return created_droplets
        except Exception as e:
            logging.error(f"创建 Droplet 时发生严重错误: {e}")
            # 如果部分创建成功,需要进行回滚清理
            self.destroy_all_agents()
            return []

    def get_active_agents(self) -> list[digitalocean.Droplet]:
        """获取所有带特定标签且状态为 active 的 Droplets"""
        try:
            all_droplets = self.manager.get_all_droplets(tag_name=self.tag)
            active_droplets = [d for d in all_droplets if d.status == 'active']
            logging.info(f"发现 {len(active_droplets)} 个活跃的 agent 节点")
            return active_droplets
        except Exception as e:
            logging.error(f"获取 agent 列表失败: {e}")
            return []

    def destroy_all_agents(self):
        """销毁所有带有特定标签的 Droplets,用于测试结束后的清理"""
        logging.info(f"开始销毁所有标签为 '{self.tag}' 的 agent 节点...")
        try:
            droplets = self.manager.get_all_droplets(tag_name=self.tag)
            if not droplets:
                logging.warning("没有需要销毁的 agent 节点。")
                return
            
            for droplet in droplets:
                droplet.destroy()
                logging.info(f"已发送销毁请求 for Droplet: {droplet.name} (ID: {droplet.id})")
        except Exception as e:
            logging.error(f"销毁 Droplets 时出错: {e}")

2. 配置管理 (Nacos Client)

接下来是与Nacos的集成。我们需要一个配置服务,它不仅能在启动时拉取配置,还要能监听后续的变更。

services/config_manager.py:

import nacos
import json
import logging
from threading import Lock

# Nacos 服务器地址
NACOS_SERVER_ADDRESS = "http://YOUR_NACOS_SERVER:8848"
# Nacos 命名空间,用于环境隔离
NAMESPACE = "public" 
# 我们的压测配置
DATA_ID = "pressure-test.json"
GROUP = "DEFAULT_GROUP"

class ConfigManager:
    """
    管理与 Nacos 的交互,获取并监听压测配置。
    这是一个单例模式的实现,确保在整个应用中只有一个 Nacos 客户端实例。
    """
    _instance = None
    _lock = Lock()

    def __new__(cls, *args, **kwargs):
        with cls._lock:
            if not cls._instance:
                cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self):
        # 防止重复初始化
        if hasattr(self, '_initialized'):
            return
        
        self.client = nacos.NacosClient(
            server_addresses=NACOS_SERVER_ADDRESS, 
            namespace=NAMESPACE
        )
        self.config = {}
        self._load_initial_config()
        self.client.add_config_watcher(DATA_ID, GROUP, self._config_update_callback)
        self._initialized = True
        logging.info("Nacos 配置管理器已初始化并设置监听器。")
    
    def _load_initial_config(self):
        try:
            content = self.client.get_config(DATA_ID, GROUP)
            if content:
                self.config = json.loads(content)
                logging.info(f"成功加载初始配置: {self.config}")
            else:
                logging.warning("Nacos 中未找到初始配置,使用空配置。")
        except Exception as e:
            logging.error(f"从 Nacos 加载初始配置失败: {e}")
            # 在生产环境中,这里可能需要一个备用(本地)默认配置
            self.config = {}

    def _config_update_callback(self, config):
        """Nacos 推送配置变更时调用的函数"""
        logging.info("接收到 Nacos 配置更新...")
        try:
            new_config = json.loads(config['content'])
            self.config = new_config
            logging.info(f"配置已更新: {self.config}")
            # 这里可以触发一个事件,通知系统的其他部分配置已变更
            # 例如,通知所有 agent 更新它们的压测参数
        except json.JSONDecodeError as e:
            logging.error(f"解析更新的配置失败: {e}")
        except Exception as e:
            logging.error(f"处理配置更新时发生未知错误: {e}")

    def get_current_config(self) -> dict:
        return self.config.copy()

我们在Nacos中创建的pressure-test.json配置内容可能如下:

{
  "target_url": "http://api.example.com/v1/data",
  "concurrency": 100,
  "duration_seconds": 300,
  "agent_count": 5,
  "request_headers": {
    "X-Client-ID": "pressure-test-platform"
  }
}

当我们在Nacos控制台把concurrency从100改为200时,_config_update_callback会被自动触发。

3. API端点与WebSocket

最后,我们将这些服务组合到FastAPI应用中。

main.py:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
import logging
from collections import deque

from services.droplet_manager import DropletManager
from services.config_manager import ConfigManager
from services.websocket_manager import ConnectionManager # 一个简单的WebSocket连接管理类

app = FastAPI()
droplet_manager = DropletManager()
config_manager = ConfigManager()
ws_manager = ConnectionManager()

# 用于存储聚合后的时序数据,这里用一个简单的 deque,生产环境应换成时序数据库
metrics_storage = deque(maxlen=300) # 只保留最近300个数据点

@app.on_event("startup")
async def startup_event():
    # 确保Nacos客户端在事件循环中运行
    config_manager.client.start()
    logging.info("FastAPI 应用启动,Nacos 客户端已启动。")

@app.on_event("shutdown")
def shutdown_event():
    droplet_manager.destroy_all_agents()
    config_manager.client.close()
    logging.info("FastAPI 应用关闭,已清理资源。")

@app.post("/test/start")
async def start_test():
    current_config = config_manager.get_current_config()
    agent_count = current_config.get("agent_count", 1)
    # SSH Key需要预先上传到DigitalOcean账户
    ssh_key_fingerprint = os.getenv("DO_SSH_KEY_FINGERPRINT")
    if not ssh_key_fingerprint:
        return {"status": "error", "message": "DO_SSH_KEY_FINGERPRINT 未设置"}
    
    droplet_manager.create_agents(agent_count, ssh_key_fingerprint)
    return {"status": "ok", "message": f"正在启动 {agent_count} 个压测节点..."}

@app.post("/test/stop")
async def stop_test():
    droplet_manager.destroy_all_agents()
    return {"status": "ok", "message": "正在停止所有压测节点..."}

@app.post("/metrics")
async def receive_metrics(data: dict):
    # Agent 上报数据的入口
    # 在真实项目中,这里需要做数据校验和身份验证
    metrics_storage.append(data)
    # 将收到的新数据广播给所有连接的前端
    await ws_manager.broadcast(data)
    return {"status": "received"}

@app.websocket("/ws/metrics")
async def websocket_endpoint(websocket: WebSocket):
    await ws_manager.connect(websocket)
    try:
        # 连接建立时,立即发送历史数据,让前端图表不是空的
        if metrics_storage:
            await websocket.send_json({"type": "history", "data": list(metrics_storage)})
        while True:
            # 保持连接开放
            await asyncio.sleep(1)
    except WebSocketDisconnect:
        ws_manager.disconnect(websocket)
        logging.info("一个前端客户端断开连接。")

压测Agent的实现

Agent必须足够简单、健壮。它的职责是执行压测并上报数据。

agent/agent.py:

import httpx
import asyncio
import time
import os
import socket
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - AGENT - %(levelname)s - %(message)s')

CONTROL_PLANE_URL = os.getenv("CONTROL_PLANE_URL", "http://localhost:8000")
AGENT_ID = socket.gethostname()

class StressAgent:
    def __init__(self):
        self.config = {}
        self.active = False
        self.stats = {
            "requests": 0,
            "success": 0,
            "errors": 0,
            "latencies": []
        }

    async def fetch_config(self):
        # 实际上,这里应该从控制平面获取配置。
        # 为了演示Nacos的动态性,一个更好的做法是agent也直连Nacos监听配置。
        # 但这会增加agent的复杂性。折中方案是控制平面在配置变更后,
        # 主动通知所有已注册的agent。
        # 这里简化为agent定期轮询控制平面的一个配置接口。
        # 我们暂时假设配置在启动时一次性获取。
        # A more robust implementation would be needed here.
        # For this example, we'll simulate it by fetching from a non-existent endpoint
        # and using a hardcoded config.
        self.config = {
            "target_url": "http://httpbin.org/delay/0.1",
            "concurrency": 50,
            "duration_seconds": 600
        }
        logging.info(f"获取到压测配置: {self.config}")

    async def run_worker(self, client: httpx.AsyncClient):
        while self.active:
            start_time = time.perf_counter()
            try:
                response = await client.get(self.config['target_url'])
                self.stats["requests"] += 1
                if 200 <= response.status_code < 300:
                    self.stats["success"] += 1
                else:
                    self.stats["errors"] += 1
            except httpx.RequestError:
                self.stats["errors"] += 1
            finally:
                latency = time.perf_counter() - start_time
                self.stats["latencies"].append(latency)

    async def report_metrics(self):
        while self.active:
            await asyncio.sleep(5) # 每5秒上报一次
            
            if not self.stats["latencies"]:
                continue

            # 计算性能指标
            latencies = self.stats["latencies"]
            p95 = sorted(latencies)[int(len(latencies) * 0.95)]
            avg_latency = sum(latencies) / len(latencies)
            
            payload = {
                "timestamp": int(time.time()),
                "agent_id": AGENT_ID,
                "rps": self.stats["requests"] / 5.0, # requests per second
                "error_rate": self.stats["errors"] / self.stats["requests"] if self.stats["requests"] > 0 else 0,
                "latency_p95": p95,
                "latency_avg": avg_latency
            }
            
            # 重置统计
            self.stats = {"requests": 0, "success": 0, "errors": 0, "latencies": []}
            
            try:
                async with httpx.AsyncClient() as client:
                    await client.post(f"{CONTROL_PLANE_URL}/metrics", json=payload)
                logging.info(f"上报指标: RPS={payload['rps']:.2f}, P95={payload['latency_p95']:.4f}s")
            except Exception as e:
                logging.error(f"上报指标失败: {e}")

    async def start(self):
        await self.fetch_config()
        self.active = True
        
        async with httpx.AsyncClient(timeout=10.0) as client:
            workers = [self.run_worker(client) for _ in range(self.config['concurrency'])]
            reporter = self.report_metrics()
            await asyncio.gather(*workers, reporter)

if __name__ == "__main__":
    agent = StressAgent()
    asyncio.run(agent.start())

前端实时仪表盘

前端的核心是接收WebSocket推送的数据并更新图表。这里只展示关键的JavaScript逻辑。

components/Dashboard.js (React):

import React, { useState, useEffect, useRef } from 'react';
import ReactECharts from 'echarts-for-react';

const Dashboard = () => {
    const [metrics, setMetrics] = useState([]);
    const [isConnected, setIsConnected] = useState(false);
    const ws = useRef(null);

    useEffect(() => {
        // WebSocket 的地址需要根据部署情况修改
        ws.current = new WebSocket('ws://YOUR_CONTROL_PLANE_IP:8000/ws/metrics');
        
        ws.current.onopen = () => {
            console.log("WebSocket connected");
            setIsConnected(true);
        };

        ws.current.onclose = () => {
            console.log("WebSocket disconnected");
            setIsConnected(false);
        };

        ws.current.onmessage = (event) => {
            const message = JSON.parse(event.data);
            
            if (message.type === 'history') {
                // 处理历史数据
                setMetrics(prev => [...message.data, ...prev]);
            } else {
                // 处理实时单点数据
                setMetrics(prev => {
                    const newData = [...prev, message];
                    // 保持图表只显示最近 N 个点
                    return newData.length > 300 ? newData.slice(-300) : newData;
                });
            }
        };

        return () => {
            ws.current.close();
        };
    }, []);

    const getChartOptions = () => {
        const timestamps = metrics.map(m => new Date(m.timestamp * 1000).toLocaleTimeString());
        const rpsData = metrics.map(m => m.rps.toFixed(2));
        const p95Data = metrics.map(m => (m.latency_p95 * 1000).toFixed(2)); // ms

        return {
            tooltip: { trigger: 'axis' },
            legend: { data: ['RPS', 'P95 Latency (ms)'] },
            xAxis: { type: 'category', data: timestamps },
            yAxis: [
                { type: 'value', name: 'RPS', position: 'left' },
                { type: 'value', name: 'Latency (ms)', position: 'right' }
            ],
            series: [
                { name: 'RPS', type: 'line', yAxisIndex: 0, data: rpsData, smooth: true },
                { name: 'P95 Latency (ms)', type: 'line', yAxisIndex: 1, data: p95Data, smooth: true }
            ]
        };
    };

    return (
        <div>
            <h1>Real-time Performance Metrics</h1>
            <p>Connection Status: {isConnected ? 'Connected' : 'Disconnected'}</p>
            <ReactECharts option={getChartOptions()} style={{ height: '500px' }} />
        </div>
    );
};

export default Dashboard;

方案的局限性与未来迭代

这套系统虽然解决了最初的痛点,但作为一个V1版本,它在生产环境中的局限性也相当明显。

首先,数据持久化和查询能力是短板。控制平面使用内存中的deque来存储指标,这会导致服务重启后历史数据丢失,也无法对历史压测数据进行回顾分析。一个合理的演进方向是引入专门的时序数据库(TSDB),如VictoriaMetrics或InfluxDB。Agent直接将指标写入TSDB,控制平面和前端则从TSDB查询数据。

其次,Agent的调度和生命周期管理过于粗糙。直接使用DigitalOcean Droplet的启动时间较长(通常需要1-2分钟),不适合需要快速响应的弹性伸缩场景。更理想的架构是基于Kubernetes(例如DigitalOcean Kubernetes Service, DOKS),将Agent打包成容器。通过调整Deployment的replicas数量,可以在秒级完成扩缩容,效率远高于虚拟机。

最后,配置分发链路可以优化。目前配置变更需要控制平面中转,增加了系统的复杂性和故障点。可以让Agent直接连接Nacos监听配置变更,这样控制平面的职责会更纯粹,只负责任务编排和UI交互。但这要求为Agent配置Nacos的访问权限,需要权衡安全性。

未来的迭代路径很清晰:将Agent容器化并迁移到Kubernetes,引入专业的时序数据库和消息队列(用于指标收集,解耦Agent与后端),并可能将整个控制平面重构为一个Kubernetes Operator,以声明式的方式管理压测任务的整个生命周期。


  目录