我们团队之前的压力测试流程非常原始:在一台预置的服务器上手动运行wrk或JMeter,调整参数需要停止测试、修改脚本、然后重新启动。这种方式不仅效率低下,而且无法模拟真实世界中流量的动态变化,更无法在测试进行中调整策略。当需要模拟更大规模的并发时,就得手动去申请和配置更多机器,整个过程耗时耗力,且结果难以复现。痛点很明确:我们需要一个能够按需、动态、自动化地创建和销毁压测节点,并且能在测试运行时实时调整压测参数的平台。
初步构想与架构设计
目标是构建一个由中心控制平面(Control Plane)和多个动态压测节点(Agent)组成的分布式系统。
- 控制平面: 负责接收用户指令(开始/停止测试),与云平台API交互以创建和销毁Agent节点,作为配置中心(Nacos)的客户端实时获取并分发压测配置,同时聚合来自所有Agent的时序性能指标,并通过WebSocket推送给前端。
- 压测Agent: 无状态的执行单元。启动后,从控制平面获取压测任务配置,对目标服务发起负载,并定期将性能指标(RPS, 延迟分布, 错误率等)上报给控制平面。
- 配置中心: 使用Nacos。核心价值在于其动态配置推送能力,允许我们在不重启任何服务的情况下,修改压测的目标URL、并发数、持续时间等关键参数。
- 前端: 一个简单的仪表盘,用于触发测试和实时可视化展示时序性能数据。
整个系统的交互流程可以用下面的图来描述:
sequenceDiagram
participant FE as 前端仪表盘
participant CP as 控制平面 (Python FastAPI)
participant Nacos
participant DO as DigitalOcean API
participant Agents as 压测节点 (Droplets)
participant Target as 目标服务
FE->>+CP: 发起/停止压测 (HTTP API)
CP->>+Nacos: 订阅压测配置
Nacos-->>-CP: 推送初始/变更配置
CP->>+DO: 创建/销毁Droplets (Agents)
DO-->>-CP: 返回Droplet状态
Agents->>+CP: 启动后注册并请求配置
CP-->>-Agents: 分发压测配置
loop 压测执行循环
Agents->>+Target: 发起HTTP请求
Target-->>-Agents: 返回响应
Agents->>+CP: 上报时序指标 (HTTP)
end
CP-->>FE: 实时推送聚合指标 (WebSocket)
技术选型与理由
- 控制平面: 选择Python FastAPI。它的异步特性非常适合处理I/O密集型任务,如同时管理多个Agent的连接、与DigitalOcean API通信以及处理WebSocket流。开发效率高,性能也足以胜任控制平面的角色。
- 云平台: DigitalOcean。关键在于它提供了非常简洁清晰的RESTful API和成熟的Python SDK (
python-digitalocean),可以轻松地用代码创建、查询和销毁Droplets(虚拟机)。相比AWS的复杂性,DO在快速原型和中小型项目上更具优势。 - 压测Agent逻辑: 直接使用Python的
httpx库。它支持异步请求,性能优异,并且可以精确控制并发。虽然Locust是更专业的压测框架,但为了保持Agent的轻量和自定义能力,我们从httpx开始构建。 - 配置中心: Nacos。在真实项目中,配置的动态性至关重要。Nacos不仅能存储配置,其核心能力是“推送”,当配置变更时,服务端会主动通知客户端。这正是我们实现“运行时调参”的关键。相比之下,把配置写在代码或环境变量里完全无法满足需求。
- 前端: JavaScript (React) 配合
ECharts和 原生WebSocketAPI。React用于构建组件化UI,ECharts负责将复杂的时序数据转化为直观的图表,WebSocket则提供了与后端进行低延迟、双向通信的通道,是实现实时监控的基石。
核心实现:控制平面
控制平面是整个系统的大脑。它的代码需要处理几件核心事务:基础设施管理、配置管理和数据聚合。
1. 基础设施管理 (DigitalOcean API)
我们需要一个服务类来封装与DigitalOcean的交互。这里的代码必须是生产级的,包含错误处理和状态管理。
services/droplet_manager.py:
import os
import time
import logging
import digitalocean
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class DropletManager:
"""
封装 DigitalOcean Droplet 的创建、销毁和状态查询逻辑。
在真实项目中,API Token 绝不能硬编码,必须通过环境变量或安全的密钥管理服务获取。
"""
def __init__(self, tag: str = "pressure-agent"):
self.token = os.getenv("DIGITALOCEAN_TOKEN")
if not self.token:
raise ValueError("DIGITALOCEAN_TOKEN 环境变量未设置")
self.manager = digitalocean.Manager(token=self.token)
self.tag = tag # 使用标签来管理所有 agent 节点
def create_agents(self, count: int, ssh_key_fingerprint: str) -> list[digitalocean.Droplet]:
"""
批量创建 agent Droplets。
user_data 用于在 Droplet 启动时自动执行脚本,例如安装依赖、启动 agent 程序。
"""
logging.info(f"准备创建 {count} 个 agent Droplets...")
# 这里的 user_data 是一个启动脚本,它会下载并运行 agent 代码
# 在生产环境中,更稳妥的方式是构建一个包含所有依赖的自定义镜像 (Snapshot)
user_data_script = """#!/bin/bash
apt-get update -y
apt-get install -y python3 python3-pip git
git clone https://your-repo/agent.git /opt/agent
pip3 install -r /opt/agent/requirements.txt
# 启动 agent,并将控制平面地址作为参数传入
export CONTROL_PLANE_URL='http://YOUR_CONTROL_PLANE_IP:8000'
python3 /opt/agent/agent.py
"""
droplets = []
names = [f"pressure-agent-{self.tag}-{int(time.time())}-{i}" for i in range(count)]
try:
for name in names:
droplet = digitalocean.Droplet(
token=self.token,
name=name,
region='nyc3', # 选择合适的区域
image='ubuntu-22-04-x64', # 基础镜像
size_slug='s-1vcpu-1gb', # 最小规格用于演示
ssh_keys=[ssh_key_fingerprint],
user_data=user_data_script,
tags=[self.tag]
)
droplets.append(droplet)
# DigitalOcean API 不直接支持批量创建,但我们可以并发请求
# 这里为简化,使用串行创建
created_droplets = []
for d in droplets:
d.create()
logging.info(f"Droplet {d.name} 创建请求已发送, ID: {d.id}")
created_droplets.append(d)
return created_droplets
except Exception as e:
logging.error(f"创建 Droplet 时发生严重错误: {e}")
# 如果部分创建成功,需要进行回滚清理
self.destroy_all_agents()
return []
def get_active_agents(self) -> list[digitalocean.Droplet]:
"""获取所有带特定标签且状态为 active 的 Droplets"""
try:
all_droplets = self.manager.get_all_droplets(tag_name=self.tag)
active_droplets = [d for d in all_droplets if d.status == 'active']
logging.info(f"发现 {len(active_droplets)} 个活跃的 agent 节点")
return active_droplets
except Exception as e:
logging.error(f"获取 agent 列表失败: {e}")
return []
def destroy_all_agents(self):
"""销毁所有带有特定标签的 Droplets,用于测试结束后的清理"""
logging.info(f"开始销毁所有标签为 '{self.tag}' 的 agent 节点...")
try:
droplets = self.manager.get_all_droplets(tag_name=self.tag)
if not droplets:
logging.warning("没有需要销毁的 agent 节点。")
return
for droplet in droplets:
droplet.destroy()
logging.info(f"已发送销毁请求 for Droplet: {droplet.name} (ID: {droplet.id})")
except Exception as e:
logging.error(f"销毁 Droplets 时出错: {e}")
2. 配置管理 (Nacos Client)
接下来是与Nacos的集成。我们需要一个配置服务,它不仅能在启动时拉取配置,还要能监听后续的变更。
services/config_manager.py:
import nacos
import json
import logging
from threading import Lock
# Nacos 服务器地址
NACOS_SERVER_ADDRESS = "http://YOUR_NACOS_SERVER:8848"
# Nacos 命名空间,用于环境隔离
NAMESPACE = "public"
# 我们的压测配置
DATA_ID = "pressure-test.json"
GROUP = "DEFAULT_GROUP"
class ConfigManager:
"""
管理与 Nacos 的交互,获取并监听压测配置。
这是一个单例模式的实现,确保在整个应用中只有一个 Nacos 客户端实例。
"""
_instance = None
_lock = Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
# 防止重复初始化
if hasattr(self, '_initialized'):
return
self.client = nacos.NacosClient(
server_addresses=NACOS_SERVER_ADDRESS,
namespace=NAMESPACE
)
self.config = {}
self._load_initial_config()
self.client.add_config_watcher(DATA_ID, GROUP, self._config_update_callback)
self._initialized = True
logging.info("Nacos 配置管理器已初始化并设置监听器。")
def _load_initial_config(self):
try:
content = self.client.get_config(DATA_ID, GROUP)
if content:
self.config = json.loads(content)
logging.info(f"成功加载初始配置: {self.config}")
else:
logging.warning("Nacos 中未找到初始配置,使用空配置。")
except Exception as e:
logging.error(f"从 Nacos 加载初始配置失败: {e}")
# 在生产环境中,这里可能需要一个备用(本地)默认配置
self.config = {}
def _config_update_callback(self, config):
"""Nacos 推送配置变更时调用的函数"""
logging.info("接收到 Nacos 配置更新...")
try:
new_config = json.loads(config['content'])
self.config = new_config
logging.info(f"配置已更新: {self.config}")
# 这里可以触发一个事件,通知系统的其他部分配置已变更
# 例如,通知所有 agent 更新它们的压测参数
except json.JSONDecodeError as e:
logging.error(f"解析更新的配置失败: {e}")
except Exception as e:
logging.error(f"处理配置更新时发生未知错误: {e}")
def get_current_config(self) -> dict:
return self.config.copy()
我们在Nacos中创建的pressure-test.json配置内容可能如下:
{
"target_url": "http://api.example.com/v1/data",
"concurrency": 100,
"duration_seconds": 300,
"agent_count": 5,
"request_headers": {
"X-Client-ID": "pressure-test-platform"
}
}
当我们在Nacos控制台把concurrency从100改为200时,_config_update_callback会被自动触发。
3. API端点与WebSocket
最后,我们将这些服务组合到FastAPI应用中。
main.py:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
import logging
from collections import deque
from services.droplet_manager import DropletManager
from services.config_manager import ConfigManager
from services.websocket_manager import ConnectionManager # 一个简单的WebSocket连接管理类
app = FastAPI()
droplet_manager = DropletManager()
config_manager = ConfigManager()
ws_manager = ConnectionManager()
# 用于存储聚合后的时序数据,这里用一个简单的 deque,生产环境应换成时序数据库
metrics_storage = deque(maxlen=300) # 只保留最近300个数据点
@app.on_event("startup")
async def startup_event():
# 确保Nacos客户端在事件循环中运行
config_manager.client.start()
logging.info("FastAPI 应用启动,Nacos 客户端已启动。")
@app.on_event("shutdown")
def shutdown_event():
droplet_manager.destroy_all_agents()
config_manager.client.close()
logging.info("FastAPI 应用关闭,已清理资源。")
@app.post("/test/start")
async def start_test():
current_config = config_manager.get_current_config()
agent_count = current_config.get("agent_count", 1)
# SSH Key需要预先上传到DigitalOcean账户
ssh_key_fingerprint = os.getenv("DO_SSH_KEY_FINGERPRINT")
if not ssh_key_fingerprint:
return {"status": "error", "message": "DO_SSH_KEY_FINGERPRINT 未设置"}
droplet_manager.create_agents(agent_count, ssh_key_fingerprint)
return {"status": "ok", "message": f"正在启动 {agent_count} 个压测节点..."}
@app.post("/test/stop")
async def stop_test():
droplet_manager.destroy_all_agents()
return {"status": "ok", "message": "正在停止所有压测节点..."}
@app.post("/metrics")
async def receive_metrics(data: dict):
# Agent 上报数据的入口
# 在真实项目中,这里需要做数据校验和身份验证
metrics_storage.append(data)
# 将收到的新数据广播给所有连接的前端
await ws_manager.broadcast(data)
return {"status": "received"}
@app.websocket("/ws/metrics")
async def websocket_endpoint(websocket: WebSocket):
await ws_manager.connect(websocket)
try:
# 连接建立时,立即发送历史数据,让前端图表不是空的
if metrics_storage:
await websocket.send_json({"type": "history", "data": list(metrics_storage)})
while True:
# 保持连接开放
await asyncio.sleep(1)
except WebSocketDisconnect:
ws_manager.disconnect(websocket)
logging.info("一个前端客户端断开连接。")
压测Agent的实现
Agent必须足够简单、健壮。它的职责是执行压测并上报数据。
agent/agent.py:
import httpx
import asyncio
import time
import os
import socket
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - AGENT - %(levelname)s - %(message)s')
CONTROL_PLANE_URL = os.getenv("CONTROL_PLANE_URL", "http://localhost:8000")
AGENT_ID = socket.gethostname()
class StressAgent:
def __init__(self):
self.config = {}
self.active = False
self.stats = {
"requests": 0,
"success": 0,
"errors": 0,
"latencies": []
}
async def fetch_config(self):
# 实际上,这里应该从控制平面获取配置。
# 为了演示Nacos的动态性,一个更好的做法是agent也直连Nacos监听配置。
# 但这会增加agent的复杂性。折中方案是控制平面在配置变更后,
# 主动通知所有已注册的agent。
# 这里简化为agent定期轮询控制平面的一个配置接口。
# 我们暂时假设配置在启动时一次性获取。
# A more robust implementation would be needed here.
# For this example, we'll simulate it by fetching from a non-existent endpoint
# and using a hardcoded config.
self.config = {
"target_url": "http://httpbin.org/delay/0.1",
"concurrency": 50,
"duration_seconds": 600
}
logging.info(f"获取到压测配置: {self.config}")
async def run_worker(self, client: httpx.AsyncClient):
while self.active:
start_time = time.perf_counter()
try:
response = await client.get(self.config['target_url'])
self.stats["requests"] += 1
if 200 <= response.status_code < 300:
self.stats["success"] += 1
else:
self.stats["errors"] += 1
except httpx.RequestError:
self.stats["errors"] += 1
finally:
latency = time.perf_counter() - start_time
self.stats["latencies"].append(latency)
async def report_metrics(self):
while self.active:
await asyncio.sleep(5) # 每5秒上报一次
if not self.stats["latencies"]:
continue
# 计算性能指标
latencies = self.stats["latencies"]
p95 = sorted(latencies)[int(len(latencies) * 0.95)]
avg_latency = sum(latencies) / len(latencies)
payload = {
"timestamp": int(time.time()),
"agent_id": AGENT_ID,
"rps": self.stats["requests"] / 5.0, # requests per second
"error_rate": self.stats["errors"] / self.stats["requests"] if self.stats["requests"] > 0 else 0,
"latency_p95": p95,
"latency_avg": avg_latency
}
# 重置统计
self.stats = {"requests": 0, "success": 0, "errors": 0, "latencies": []}
try:
async with httpx.AsyncClient() as client:
await client.post(f"{CONTROL_PLANE_URL}/metrics", json=payload)
logging.info(f"上报指标: RPS={payload['rps']:.2f}, P95={payload['latency_p95']:.4f}s")
except Exception as e:
logging.error(f"上报指标失败: {e}")
async def start(self):
await self.fetch_config()
self.active = True
async with httpx.AsyncClient(timeout=10.0) as client:
workers = [self.run_worker(client) for _ in range(self.config['concurrency'])]
reporter = self.report_metrics()
await asyncio.gather(*workers, reporter)
if __name__ == "__main__":
agent = StressAgent()
asyncio.run(agent.start())
前端实时仪表盘
前端的核心是接收WebSocket推送的数据并更新图表。这里只展示关键的JavaScript逻辑。
components/Dashboard.js (React):
import React, { useState, useEffect, useRef } from 'react';
import ReactECharts from 'echarts-for-react';
const Dashboard = () => {
const [metrics, setMetrics] = useState([]);
const [isConnected, setIsConnected] = useState(false);
const ws = useRef(null);
useEffect(() => {
// WebSocket 的地址需要根据部署情况修改
ws.current = new WebSocket('ws://YOUR_CONTROL_PLANE_IP:8000/ws/metrics');
ws.current.onopen = () => {
console.log("WebSocket connected");
setIsConnected(true);
};
ws.current.onclose = () => {
console.log("WebSocket disconnected");
setIsConnected(false);
};
ws.current.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === 'history') {
// 处理历史数据
setMetrics(prev => [...message.data, ...prev]);
} else {
// 处理实时单点数据
setMetrics(prev => {
const newData = [...prev, message];
// 保持图表只显示最近 N 个点
return newData.length > 300 ? newData.slice(-300) : newData;
});
}
};
return () => {
ws.current.close();
};
}, []);
const getChartOptions = () => {
const timestamps = metrics.map(m => new Date(m.timestamp * 1000).toLocaleTimeString());
const rpsData = metrics.map(m => m.rps.toFixed(2));
const p95Data = metrics.map(m => (m.latency_p95 * 1000).toFixed(2)); // ms
return {
tooltip: { trigger: 'axis' },
legend: { data: ['RPS', 'P95 Latency (ms)'] },
xAxis: { type: 'category', data: timestamps },
yAxis: [
{ type: 'value', name: 'RPS', position: 'left' },
{ type: 'value', name: 'Latency (ms)', position: 'right' }
],
series: [
{ name: 'RPS', type: 'line', yAxisIndex: 0, data: rpsData, smooth: true },
{ name: 'P95 Latency (ms)', type: 'line', yAxisIndex: 1, data: p95Data, smooth: true }
]
};
};
return (
<div>
<h1>Real-time Performance Metrics</h1>
<p>Connection Status: {isConnected ? 'Connected' : 'Disconnected'}</p>
<ReactECharts option={getChartOptions()} style={{ height: '500px' }} />
</div>
);
};
export default Dashboard;
方案的局限性与未来迭代
这套系统虽然解决了最初的痛点,但作为一个V1版本,它在生产环境中的局限性也相当明显。
首先,数据持久化和查询能力是短板。控制平面使用内存中的deque来存储指标,这会导致服务重启后历史数据丢失,也无法对历史压测数据进行回顾分析。一个合理的演进方向是引入专门的时序数据库(TSDB),如VictoriaMetrics或InfluxDB。Agent直接将指标写入TSDB,控制平面和前端则从TSDB查询数据。
其次,Agent的调度和生命周期管理过于粗糙。直接使用DigitalOcean Droplet的启动时间较长(通常需要1-2分钟),不适合需要快速响应的弹性伸缩场景。更理想的架构是基于Kubernetes(例如DigitalOcean Kubernetes Service, DOKS),将Agent打包成容器。通过调整Deployment的replicas数量,可以在秒级完成扩缩容,效率远高于虚拟机。
最后,配置分发链路可以优化。目前配置变更需要控制平面中转,增加了系统的复杂性和故障点。可以让Agent直接连接Nacos监听配置变更,这样控制平面的职责会更纯粹,只负责任务编排和UI交互。但这要求为Agent配置Nacos的访问权限,需要权衡安全性。
未来的迭代路径很清晰:将Agent容器化并迁移到Kubernetes,引入专业的时序数据库和消息队列(用于指标收集,解耦Agent与后端),并可能将整个控制平面重构为一个Kubernetes Operator,以声明式的方式管理压测任务的整个生命周期。