团队的 CI 流水线越来越慢,其中一个瓶颈竟是代码格式化和静态检查步骤。每个作业都要在独立的 Docker 容器里 npm install 然后运行 Prettier,这在 Rust 为主的技术栈中显得格格不入且效率低下。我们需要一个集中式的、高性能的服务来解决这个问题:一个接收代码片段,返回格式化结果和基础代码分析的内部 API。这个决策引出了一系列技术选型:后端框架用什么?如何与 Prettier 这个 Node.js 生态的工具高效交互?
我们最终选择了 Rust 和 Axum 框架。原因很简单:极致的性能、内存安全,以及与 Tokio 生态系统的无缝集成,这对于处理 I/O 密集型任务(如进程间通信)至关重要。本文记录了从零开始构建这个服务的完整过程,重点在于解决核心的跨语言进程通信挑战,并在此基础上,集成简单的 NLP 分析,让服务超越一个单纯的格式化工具。
技术痛点与架构构想
直接在 Rust 中通过 std::process::Command 为每个请求启动一个 prettier 子进程是最直接的想法,但这在生产环境中是完全不可接受的。每次调用的进程创建和销毁开销、Node.js 运行时的启动延迟,都会在高并发下迅速摧毁性能。
一个更稳健的方案是,在我们的 Axum 服务启动时,就创建一个长期运行的 Prettier 子进程。服务通过子进程的 stdin 发送待处理的代码,并通过 stdout 接收格式化后的结果。这种“守护进程”模式将进程启动的开销平摊到整个服务生命周期中,使得单次请求的处理延迟极大降低。
接下来,我们需要为这种通信设计一个简单的协议。JSON-RPC 是一个不错的选择,但为了极致的简洁,我们决定采用更基础的行分割 JSON 协议。Rust 服务向子进程的 stdin 写入一行 JSON 字符串,Node.js 脚本读取这一行,处理后,向 stdout 写回一行结果 JSON。
整个架构的请求生命周期如下:
sequenceDiagram
participant Client
participant Axum Service
participant Prettier Worker (Node.js)
Client->>+Axum Service: POST /format (code, options)
Axum Service->>+Prettier Worker: Writes JSON to stdin
Note right of Axum Service: { "id": "req-1", "code": "...", "parser": "typescript" }
Prettier Worker-->>-Axum Service: Reads from stdin & formats
Note left of Prettier Worker: Formatting logic executes
Axum Service->>+Prettier Worker: Writes JSON to stdout
Note right of Axum Service: { "id": "req-1", "result": "..." } or { "id": "req-1", "error": "..." }
Prettier Worker-->>-Client: Returns formatted code & analysis
项目初始化与 Axum 基础搭建
首先是项目结构。我们需要一个 Rust 项目,以及一个存放 Node.js worker 脚本的地方。
# 创建 Rust 项目
cargo new code_formatter_service
cd code_formatter_service
# 创建 Node.js worker 目录
mkdir -p worker
touch worker/prettier-worker.js
touch worker/package.json
在 worker/package.json 中添加 Prettier 依赖:
{
"name": "prettier-worker",
"version": "1.0.0",
"private": true,
"dependencies": {
"prettier": "^3.0.0"
}
}
然后在 worker 目录运行 npm install。
接下来是 Cargo.toml 的配置,我们需要 axum, tokio, serde 作为基础,以及 tracing 用于日志记录。
[package]
name = "code_formatter_service"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = "0.7"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
anyhow = "1.0"
thiserror = "1.0"
uuid = { version = "1.6", features = ["v4"] }
我们的 main.rs 先从一个基础的 Axum 服务开始,包含日志和状态管理。服务的核心状态 AppState 将持有一个到 Prettier Worker 的通信句柄。
// src/main.rs
use axum::{
routing::post,
Router,
};
use std::net::SocketAddr;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
// 定义应用状态,稍后会填充
#[derive(Clone)]
struct AppState {}
#[tokio::main]
async fn main() {
// 初始化日志
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "code_formatter_service=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let state = AppState {};
let app = Router::new()
// 稍后实现 format_handler
// .route("/format", post(format_handler))
.with_state(state);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
核心:构建 Rust 与 Node.js 的通信桥梁
这是整个项目的技术核心。我们将创建一个 PrettierWorker 结构体来封装与子进程的所有交互。
首先,定义通信协议的数据结构。
// src/worker.rs (新文件)
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Debug)]
pub struct WorkerRequest {
pub id: Uuid,
pub code: String,
pub parser: String,
}
#[derive(Deserialize, Debug)]
#[serde(untagged)]
pub enum WorkerResponse {
Success { id: Uuid, result: String },
Error { id: Uuid, error: String },
}
接下来是 prettier-worker.js 的实现。它必须从 stdin 逐行读取 JSON,调用 Prettier,然后将结果写回 stdout。
// worker/prettier-worker.js
const readline = require('readline');
const prettier = require('prettier');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
rl.on('line', async (line) => {
try {
const request = JSON.parse(line);
const { id, code, parser } = request;
if (!id || typeof code !== 'string' || !parser) {
throw new Error('Invalid request payload');
}
const formattedCode = await prettier.format(code, { parser, printWidth: 100 });
const response = {
id: id,
result: formattedCode
};
// 关键:必须加换行符,以便 Rust 端可以按行读取
process.stdout.write(JSON.stringify(response) + '\n');
} catch (e) {
// 捕获 Prettier 的格式化错误或 JSON 解析错误
const request = JSON.parse(line); // 尝试再次解析以获取ID
const response = {
id: request.id || 'unknown',
error: e.message
};
process.stdout.write(JSON.stringify(response) + '\n');
}
});
现在,我们来实现 Rust 端的 PrettierWorker。它需要管理子进程的生命周期,并提供一个异步方法来发送请求和接收响应。这里的坑在于,多个 Axum 请求可能会并发地与同一个子进程交互,因此我们需要一个 Mutex 来保证每次只有一个请求在写入和等待响应。同时,stdin 和 stdout 的读写必须是异步的。
// src/worker.rs (续)
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::{Mutex, oneshot};
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
// 自定义错误类型
#[derive(thiserror::Error, Debug)]
pub enum WorkerError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("JSON serialization/deserialization error: {0}")]
Json(#[from] serde_json::Error),
#[error("Worker process terminated unexpectedly")]
ProcessTerminated,
#[error("Request timed out")]
Timeout,
#[error("Worker returned an error: {0}")]
WorkerError(String),
}
// 负责与子进程通信的Actor
pub struct PrettierWorker {
stdin: Arc<Mutex<ChildStdin>>,
// 使用oneshot channel来匹配请求和响应
pending_requests: Arc<Mutex<HashMap<Uuid, oneshot::Sender<WorkerResponse>>>>,
}
impl PrettierWorker {
pub fn new() -> Result<Self, WorkerError> {
let mut child = Command::new("node")
.arg("worker/prettier-worker.js")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit()) // 将stderr重定向到父进程,方便调试
.spawn()?;
let stdin = child.stdin.take().ok_or_else(|| WorkerError::Io(io::Error::new(io::ErrorKind::Other, "Failed to get stdin")))?;
let stdout = child.stdout.take().ok_or_else(|| WorkerError::Io(io::Error::new(io::ErrorKind::Other, "Failed to get stdout")))?;
let pending_requests = Arc::new(Mutex::new(HashMap::new()));
let pending_requests_clone = Arc::clone(&pending_requests);
// 启动一个后台任务来持续读取stdout
tokio::spawn(async move {
Self::read_output(child, stdout, pending_requests_clone).await;
});
Ok(Self {
stdin: Arc::new(Mutex::new(stdin)),
pending_requests,
})
}
// 后台读取任务
async fn read_output(
mut child: Child,
stdout: ChildStdout,
pending_requests: Arc<Mutex<HashMap<Uuid, oneshot::Sender<WorkerResponse>>>>
) {
let mut reader = BufReader::new(stdout).lines();
loop {
tokio::select! {
// 等待子进程退出
_ = child.wait() => {
tracing::error!("Prettier worker process terminated unexpectedly.");
// 通知所有等待的请求
let mut pending = pending_requests.lock().await;
pending.clear(); // 发送者被drop时,接收者会收到错误
break;
},
// 读取一行输出
line_result = reader.next_line() => {
match line_result {
Ok(Some(line)) => {
if let Ok(response) = serde_json::from_str::<WorkerResponse>(&line) {
let id = match &response {
WorkerResponse::Success { id, .. } => *id,
WorkerResponse::Error { id, .. } => *id,
};
let mut pending = pending_requests.lock().await;
if let Some(tx) = pending.remove(&id) {
let _ = tx.send(response);
}
} else {
tracing::warn!("Failed to parse worker response: {}", line);
}
},
Ok(None) => {
// stdout关闭,进程已退出
tracing::info!("Prettier worker stdout closed.");
break;
}
Err(e) => {
tracing::error!("Error reading from worker stdout: {}", e);
break;
}
}
}
}
}
}
pub async fn format(&self, code: String, parser: String) -> Result<String, WorkerError> {
let request = WorkerRequest {
id: Uuid::new_v4(),
code,
parser,
};
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending_requests.lock().await;
pending.insert(request.id, tx);
}
let mut request_json = serde_json::to_string(&request)? + "\n";
{
let mut stdin_guard = self.stdin.lock().await;
stdin_guard.write_all(request_json.as_bytes()).await?;
stdin_guard.flush().await?;
}
// 设置一个超时
match tokio::time::timeout(tokio::time::Duration::from_secs(5), rx).await {
Ok(Ok(response)) => match response {
WorkerResponse::Success { result, .. } => Ok(result),
WorkerResponse::Error { error, .. } => Err(WorkerError::WorkerError(error)),
},
Ok(Err(_)) => Err(WorkerError::ProcessTerminated), // channel被关闭
Err(_) => Err(WorkerError::Timeout),
}
}
}
这段代码是系统的核心。我们不再为每个请求加锁整个进程,而是使用一个HashMap来追踪在途请求。每个请求在发送前,都会在map中注册一个oneshot::Sender。后台的读取任务在收到响应后,根据ID找到对应的Sender并发送结果。这种设计极大地提升了并发能力。
集成到 Axum Handler 并添加 NLP
现在,我们将 PrettierWorker 集成到 AppState 和 axum 的 handler 中。
// src/main.rs (更新)
mod worker; // 引入worker模块
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
routing::post,
Json, Router,
};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use worker::PrettierWorker;
// 将Worker封装在Arc中,以便在多线程间共享
#[derive(Clone)]
struct AppState {
worker: Arc<PrettierWorker>,
}
#[derive(Deserialize)]
struct FormatRequest {
code: String,
parser: String,
}
#[derive(Serialize)]
struct FormatResponse {
formatted_code: String,
analysis: CodeAnalysis,
}
#[derive(Serialize)]
struct CodeAnalysis {
todo_comment_count: usize,
}
async fn format_handler(
State(state): State<AppState>,
Json(payload): Json<FormatRequest>,
) -> Result<Json<FormatResponse>, AppError> {
let formatted_code = state
.worker
.format(payload.code, payload.parser)
.await?;
// 在这里集成NLP逻辑
// 这是一个简单的例子:使用regex统计TODO注释的数量
let todo_regex = regex::Regex::new(r"(?i)//\s*TODO").unwrap();
let todo_count = todo_regex.find_iter(&formatted_code).count();
let response = FormatResponse {
formatted_code,
analysis: CodeAnalysis {
todo_comment_count: todo_count,
},
};
Ok(Json(response))
}
// 统一的错误处理
struct AppError(worker::WorkerError);
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, error_message) = match self.0 {
worker::WorkerError::WorkerError(msg) => (StatusCode::BAD_REQUEST, format!("Formatting error: {}", msg)),
worker::WorkerError::Timeout => (StatusCode::GATEWAY_TIMEOUT, "Request to worker timed out".to_string()),
_ => (StatusCode::INTERNAL_SERVER_ERROR, "An internal error occurred".to_string()),
};
(status, Json(serde_json::json!({ "error": error_message }))).into_response()
}
}
impl<E> From<E> for AppError where E: Into<worker::WorkerError> {
fn from(err: E) -> Self {
Self(err.into())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "code_formatter_service=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
// 还需要添加 regex 依赖
// cargo add regex
let worker = PrettierWorker::new()?;
let state = AppState {
worker: Arc::new(worker),
};
let app = Router::new()
.route("/format", post(format_handler))
.with_state(state);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
别忘了添加 regex 依赖到 Cargo.toml: cargo add regex -F unistring。
至此,一个功能完备的服务已经成型。它能接收代码,通过一个长期运行的 Node.js 进程高效地进行格式化,然后用 Rust 的原生性能进行额外的代码分析,最后将合并的结果返回给客户端。错误处理也得到了妥善的设计,无论是 Prettier 自身的格式化错误,还是进程通信的内部错误,都能以合适的 HTTP 状态码和信息返回。
生产环境考量与测试思路
在真实项目中,还有几点需要完善:
- 配置化: Prettier worker 的路径、服务器端口等应该通过配置文件或环境变量来管理,而不是硬编码。可以使用
config-rs或dotenv等库。 - 更健壮的 Worker 管理: 当前实现中,如果 Node.js 进程崩溃,整个服务将无法工作,并且不会自动重启。一个生产级的实现应该包含一个监控协程,使用
child.wait().await来监听子进程的退出,并在必要时尝试重启它,这通常被称为“看门狗”(watchdog)模式。 - 单元测试:
format_handler可以通过axum::body::to_bytes和http::Request来进行单元测试。对于PrettierWorker,可以编写一个模拟的 Node.js 脚本作为测试替身,该脚本仅回显请求或返回预设的错误,从而在不依赖真实 Prettier 的情况下测试通信逻辑。 - 集成测试: 编写一个测试用例,它实际启动整个 Axum 服务和子进程,然后通过 HTTP 客户端发送请求,断言响应是否符合预期。这能确保端到端的流程正确无误。
局限性与未来迭代方向
当前的设计虽然高效,但仍存在一个瓶颈:单一的 Prettier worker 进程。尽管内部通信是异步的,但在 Node.js 端,格式化任务本身是 CPU 密集型的,这意味着在同一时刻,只有一个格式化任务能在执行。
为了应对更高的并发量,未来的迭代可以探索以下路径:
- Worker 池: 启动一个固定大小的 Prettier worker 进程池。Axum 服务端维护一个队列,将请求分发给空闲的 worker。这需要更复杂的进程管理和请求路由逻辑,可以使用
tokio::sync::mpscchannel 来实现。 - WASM 替代方案: 对于某些语言的格式化器,存在或可以编译为 WebAssembly (WASM) 的版本。在 Rust 中使用像 Wasmer 或 Wasmtime 这样的运行时来执行 WASM 格式化器,可以完全消除跨语言进程通信的开销和 Node.js 依赖,实现纯 Rust 的解决方案,这将是性能上的终极形态。
尽管存在这些可优化的点,当前架构已经成功地解决的最初的问题,为团队提供了一个稳定、快速、且易于扩展的内部代码处理基础设施。它很好地展示了如何利用 Rust 和 Axum 的强大能力,去粘合和增强其他技术生态中的优秀工具。