在维护一个大规模 Kubeflow 集群时,我们遇到了一个棘手的状态管理问题。某些复杂的 Pipeline 需要一个轻量级、高可用的协调服务来存储关键元数据,例如当前执行阶段、动态生成的配置或是小规模的中间产物校验和。这些数据要求强一致性,并且服务本身不能成为单点故障。
最初的方案是引入一个外部的 etcd 或 ZooKeeper 集群。但在真实项目中,这意味着为 MLOps 平台引入一个沉重的外部依赖。运维团队需要维护它,网络策略需要为它开放,这增加了整个系统的复杂性和故障点。我们需要的是一个与 Kubeflow 生态无缝集成的、云原生的解决方案。
方案A,即外部依赖方案,其优劣非常明显。
- 优势: 技术成熟,稳定性经过了大规模验证。社区活跃,问题容易定位。
- 劣势:
- 运维成本: 引入了新的技术栈,需要独立的监控、备份和升级策略。
- 资源开销: 即便是一个最小化的3节点 etcd 集群,对于我们这个轻量级协调任务来说也显得过于臃肿。
- 集成复杂度: 应用需要处理与外部集群的连接、认证和网络分区问题,这些逻辑与我们的核心业务无关。
于是,我们转向方案B:构建一个内嵌的、由 Kubernetes Operator 自动管理的分布式状态组件。这个组件将以后端 Pod 的形式与 Kubeflow Pipeline 的其他部分一同部署,并利用分布式一致性协议保证自身的高可用。
- 优势:
- 云原生: 完美契合 Kubernetes 的声明式API和自愈能力。用户只需一个 YAML 文件就能部署和管理整个高可用服务。
- 轻量级: 没有额外的进程或虚拟机开销,资源占用按需分配。
- 自治性: Operator 负责服务的生命周期管理,包括部署、扩缩容、故障恢复,极大地降低了运维心智负担。
- 劣势:
- 研发成本: 需要自行实现或集成一个分布式一致性算法,并开发一个功能完备的 Operator。
- 技术风险: 一致性算法的实现极易出错,需要严谨的设计和充分的测试。
权衡之下,我们决定选择方案B。长期来看,一个自治的、云原生的组件能更好地融入平台工程的理念,减少摩擦,提升研发效能。技术挑战虽然巨大,但其回报——一个完全自控、高度集成的高可用组件——是值得的。我们选择 Java 作为实现语言,因为它在企业级后端服务中拥有成熟的生态,并且有可靠的 Kubernetes Operator SDK。一致性协议则选择了 Paxos,因为它是一切共识算法的理论基石,深入理解并实现它对团队的技术成长大有裨益。
架构设计概览
我们的目标是创建一个名为 PaxosStateStore 的自定义资源(CRD)。用户可以通过一个 PaxosStateStore 对象来声明一个状态存储集群。我们的 Java Operator 会监听这些对象,并自动创建、管理一个对应的 StatefulSet。每个 StatefulSet 的 Pod 都运行着我们自己开发的 Java 应用,该应用内嵌了 Paxos 协议的实现,并相互通信构成一个一致性集群。
graph TD
subgraph Kubernetes API Server
A[User: kubectl apply -f paxos-cr.yaml] --> B{CRD: PaxosStateStore}
end
subgraph "Control Plane (Our Operator)"
C(Paxos Operator) --Watches--> B
C --Creates/Updates--> D[StatefulSet]
C --Creates/Updates--> E[Service]
end
subgraph "Worker Nodes"
D --> F1(Pod-0: Java App + Paxos)
D --> F2(Pod-1: Java App + Paxos)
D --> F3(Pod-2: Java App + Paxos)
E --Provides Stable Network ID--> F1
E --> F2
E --> F3
F1 <--Inter-Pod Communication--> F2
F2 <--Inter-Pod Communication--> F3
F3 <--Inter-Pod Communication--> F1
end
G[Kubeflow Pipeline Pod] --Uses Service E--> F1
Paxos 核心实现 (Java)
我们实现的是经典的 Basic Paxos,专注于单个值的共识达成。这足以满足我们存储关键元数据的需求。我们将 Paxos 的角色(Proposer, Acceptor, Learner)通过状态模式(State Pattern)在一个 PaxosNode 类中体现。
1. 数据结构与消息体
首先定义协议中流转的消息。这里使用简单的 POJO,在实际项目中会使用 JSON 或 Protobuf 进行序列化。
// src/main/java/com/platform/paxos/model/Message.java
package com.platform.paxos.model;
import java.io.Serializable;
// Base class for all Paxos messages
public abstract class Message implements Serializable {
private static final long serialVersionUID = 1L;
private final int senderId;
public Message(int senderId) {
this.senderId = senderId;
}
public int getSenderId() {
return senderId;
}
}
// Phase 1a: Prepare Request
public class PrepareRequest extends Message {
private static final long serialVersionUID = 1L;
private final long proposalNumber;
public PrepareRequest(int senderId, long proposalNumber) {
super(senderId);
this.proposalNumber = proposalNumber;
}
public long getProposalNumber() {
return proposalNumber;
}
}
// Phase 1b: Promise Response
public class PromiseResponse extends Message {
private static final long serialVersionUID = 1L;
private final boolean promised;
private final long lastAcceptedProposal;
private final Object lastAcceptedValue;
public PromiseResponse(int senderId, boolean promised, long lastAcceptedProposal, Object lastAcceptedValue) {
super(senderId);
this.promised = promised;
this.lastAcceptedProposal = lastAcceptedProposal;
this.lastAcceptedValue = lastAcceptedValue;
}
// Getters...
}
// Phase 2a: Accept Request
public class AcceptRequest extends Message {
private static final long serialVersionUID = 1L;
private final long proposalNumber;
private final Object value;
public AcceptRequest(int senderId, long proposalNumber, Object value) {
super(senderId);
this.proposalNumber = proposalNumber;
this.value = value;
}
// Getters...
}
// Phase 2b: Accepted Response
public class AcceptedResponse extends Message {
private static final long serialVersionUID = 1L;
private final boolean accepted;
private final long proposalNumber;
public AcceptedResponse(int senderId, boolean accepted, long proposalNumber) {
super(senderId);
this.accepted = accepted;
this.proposalNumber = proposalNumber;
}
// Getters...
}
2. Acceptor 核心逻辑
Acceptor 是 Paxos 算法的核心,它负责承诺(Promise)和接受(Accept)提议。它的状态必须持久化,以防节点崩溃。在我们的实现中,暂时用内存变量模拟,真实项目中需要写入磁盘。
// src/main/java/com/platform/paxos/core/Acceptor.java
package com.platform.paxos.core;
import com.platform.paxos.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
public class Acceptor {
private static final Logger logger = LoggerFactory.getLogger(Acceptor.class);
private final ReentrantLock lock = new ReentrantLock();
// The highest proposal number this acceptor has promised.
private long maxPromisedProposal = -1L;
// The highest proposal number this acceptor has accepted.
private long acceptedProposal = -1L;
// The value associated with the highest accepted proposal.
private Object acceptedValue = null;
private final int selfId;
public Acceptor(int selfId) {
this.selfId = selfId;
}
/**
* Handles a Prepare request from a Proposer.
* This is Phase 1b of the Paxos algorithm.
*/
public PromiseResponse onPrepare(PrepareRequest request) {
lock.lock();
try {
long proposalNumber = request.getProposalNumber();
logger.info("Node {}: Received Prepare request with proposal number {}", selfId, proposalNumber);
if (proposalNumber > maxPromisedProposal) {
maxPromisedProposal = proposalNumber;
logger.info("Node {}: Promised for proposal {}. Replying with last accepted proposal {} and value {}",
selfId, proposalNumber, acceptedProposal, acceptedValue);
// Return a promise, along with the previously accepted proposal and value (if any).
return new PromiseResponse(selfId, true, acceptedProposal, acceptedValue);
} else {
logger.warn("Node {}: Rejected Prepare for proposal {} (current max promised is {})",
selfId, proposalNumber, maxPromisedProposal);
// The proposal number is not high enough, reject.
return new PromiseResponse(selfId, false, -1L, null);
}
} finally {
lock.unlock();
}
}
/**
* Handles an Accept request from a Proposer.
* This is Phase 2b of the Paxos algorithm.
*/
public AcceptedResponse onAccept(AcceptRequest request) {
lock.lock();
try {
long proposalNumber = request.getProposalNumber();
logger.info("Node {}: Received Accept request for proposal {} with value {}", selfId, proposalNumber, request.getValue());
// Accept the proposal if its number is greater than or equal to the highest one promised.
if (proposalNumber >= maxPromisedProposal) {
// This is a critical step: update both the promised and accepted numbers.
maxPromisedProposal = proposalNumber;
acceptedProposal = proposalNumber;
acceptedValue = request.getValue();
logger.info("Node {}: Accepted proposal {} with value {}. Notifying learners.", selfId, proposalNumber, request.getValue());
return new AcceptedResponse(selfId, true, proposalNumber);
} else {
logger.warn("Node {}: Rejected Accept for proposal {} (current max promised is {})",
selfId, proposalNumber, maxPromisedProposal);
return new AcceptedResponse(selfId, false, proposalNumber);
}
} finally {
lock.unlock();
}
}
}
这里的锁机制至关重要,确保了对maxPromisedProposal, acceptedProposal 和 acceptedValue 的原子性操作,防止并发请求导致状态不一致。在真实项目中,这些状态变量必须持久化到磁盘(例如,写入一个预写日志 WAL),以保证节点重启后状态不丢失。
3. Proposer 核心逻辑
Proposer 负责发起提议,它需要经历两个阶段,并处理多数派的响应。
// src/main/java/com/platform/paxos/core/Proposer.java
package com.platform.paxos.core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// ... imports for networking and models
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
public class Proposer {
private static final Logger logger = LoggerFactory.getLogger(Proposer.class);
private final int selfId;
private final int clusterSize;
private final int majority;
private final Network aNetwork; // Abstract networking layer
private long proposalNumberCounter;
public Proposer(int selfId, int clusterSize, Network network) {
this.selfId = selfId;
this.clusterSize = clusterSize;
this.majority = clusterSize / 2 + 1;
this.aNetwork = network;
// A common practice to ensure unique proposal numbers across nodes.
this.proposalNumberCounter = System.currentTimeMillis() << 16 | selfId;
}
public CompletableFuture<Object> propose(Object value) {
CompletableFuture<Object> consensusFuture = new CompletableFuture<>();
// Use a thread to avoid blocking the caller
new Thread(() -> {
proposalNumberCounter++;
final long currentProposalNumber = proposalNumberCounter;
// --- Phase 1: Prepare ---
logger.info("Node {}: Starting proposal {} with value {}", selfId, currentProposalNumber, value);
PrepareRequest prepareRequest = new PrepareRequest(selfId, currentProposalNumber);
Collection<PromiseResponse> promises = aNetwork.broadcastAndWait(prepareRequest, majority);
if (promises.size() < majority) {
logger.warn("Node {}: Proposal {} failed in Phase 1. Not enough promises received ({} < {})",
selfId, currentProposalNumber, promises.size(), majority);
consensusFuture.completeExceptionally(new RuntimeException("Failed to get majority promise"));
return;
}
// --- Analyze Promises ---
long highestAcceptedProposal = -1L;
Object valueToPropose = value;
for (PromiseResponse promise : promises) {
if (promise.getLastAcceptedProposal() > highestAcceptedProposal) {
highestAcceptedProposal = promise.getLastAcceptedProposal();
valueToPropose = promise.getLastAcceptedValue(); // A key rule of Paxos!
}
}
if (highestAcceptedProposal != -1L) {
logger.info("Node {}: A previous value was accepted (proposal {}). Proposing that value ({}) instead.",
selfId, highestAcceptedProposal, valueToPropose);
}
// --- Phase 2: Accept ---
AcceptRequest acceptRequest = new AcceptRequest(selfId, currentProposalNumber, valueToPropose);
Collection<AcceptedResponse> acceptances = aNetwork.broadcastAndWait(acceptRequest, majority);
if (acceptances.size() >= majority) {
logger.info("Node {}: Proposal {} with value {} has been accepted by a majority ({}). Consensus reached.",
selfId, currentProposalNumber, valueToPropose, acceptances.size());
// In a full implementation, we'd wait for a Learner to confirm. For simplicity, we complete here.
consensusFuture.complete(valueToPropose);
} else {
logger.warn("Node {}: Proposal {} failed in Phase 2. Not enough acceptances received ({} < {})",
selfId, currentProposalNumber, acceptances.size(), majority);
consensusFuture.completeExceptionally(new RuntimeException("Failed to get majority acceptance"));
}
}).start();
return consensusFuture;
}
}
一个常见的错误是,Proposer 在收到 Promises 后,无论如何都继续提议自己最初的值。正确的做法是,检查所有 Promise 回复,如果其中有已经接受过的提议,必须选择那个拥有最大提议号的值,作为自己第二阶段要提议的值。这是 Paxos 算法保证安全性的关键。
Kubernetes Operator 实现 (Java)
我们使用 java-operator-sdk 来简化 Operator 的开发。
1. CRD (Custom Resource Definition)
首先,定义 PaxosStateStore 资源。它描述了我们期望的状态,比如集群大小。
// src/main/java/com/platform/operator/crd/PaxosStateStore.java
package com.platform.operator.crd;
import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Version;
@Group("platform.com")
@Version("v1alpha1")
public class PaxosStateStore extends CustomResource<PaxosStateStoreSpec, PaxosStateStoreStatus> implements Namespaced {
}
// src/main/java/com/platform/operator/crd/PaxosStateStoreSpec.java
package com.platform.operator.crd;
public class PaxosStateStoreSpec {
private int replicas;
// other configs like version, resources...
public int getReplicas() {
return replicas;
}
public void setReplicas(int replicas) {
this.replicas = replicas;
}
}
// src/main/java/com/platform/operator/crd/PaxosStateStoreStatus.java
package com.platform.operator.crd;
public class PaxosStateStoreStatus {
private int availableReplicas;
private String phase; // e.g., "CREATING", "READY"
// Getters and setters...
}
2. Reconciler 核心逻辑
Reconciler 是 Operator 的大脑,它的 reconcile 方法会被周期性调用,以确保实际状态与 CR 中定义的目标状态一致。
// src/main/java/com/platform/operator/PaxosStateStoreReconciler.java
package com.platform.operator;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.processing.event.source.InformerEventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.platform.operator.crd.PaxosStateStore;
@ControllerConfiguration
public class PaxosStateStoreReconciler implements Reconciler<PaxosStateStore>, EventSourceInitializer<PaxosStateStore> {
private static final Logger logger = LoggerFactory.getLogger(PaxosStateStoreReconciler.class);
private final KubernetesClient client;
public PaxosStateStoreReconciler(KubernetesClient client) {
this.client = client;
}
@Override
public UpdateControl<PaxosStateStore> reconcile(PaxosStateStore resource, Context<PaxosStateStore> context) {
final String namespace = resource.getMetadata().getNamespace();
final String name = resource.getMetadata().getName();
// 1. Check if StatefulSet exists.
StatefulSet existingStatefulSet = client.apps().statefulSets().inNamespace(namespace).withName(name).get();
if (existingStatefulSet == null) {
logger.info("StatefulSet {} not found, creating.", name);
StatefulSet newStatefulSet = createDesiredStatefulSet(resource);
client.apps().statefulSets().inNamespace(namespace).create(newStatefulSet);
// Update status
resource.getStatus().setPhase("CREATING");
return UpdateControl.updateStatus(resource);
}
// 2. Check if the number of replicas matches the spec.
int currentReplicas = existingStatefulSet.getSpec().getReplicas();
int desiredReplicas = resource.getSpec().getReplicas();
if (currentReplicas != desiredReplicas) {
logger.info("Replicas mismatch for {}. Current: {}, Desired: {}. Scaling...", name, currentReplicas, desiredReplicas);
client.apps().statefulSets().inNamespace(namespace).withName(name).scale(desiredReplicas);
resource.getStatus().setPhase("SCALING");
return UpdateControl.updateStatus(resource);
}
// 3. Update status to Ready if everything is fine.
int readyReplicas = existingStatefulSet.getStatus().getReadyReplicas() != null ? existingStatefulSet.getStatus().getReadyReplicas() : 0;
resource.getStatus().setAvailableReplicas(readyReplicas);
if (readyReplicas == desiredReplicas) {
resource.getStatus().setPhase("READY");
}
logger.info("Reconciliation for {} complete. Current state is desired.", name);
return UpdateControl.updateStatus(resource).rescheduleAfter(30000); // Re-check every 30s
}
private StatefulSet createDesiredStatefulSet(PaxosStateStore resource) {
// Here we build the StatefulSet object programmatically.
// This includes defining the container image, ports, volumes, etc.
// It's critical to use ownerReferences to link the StatefulSet to our CR.
// This ensures that when the CR is deleted, the StatefulSet is garbage collected.
return new StatefulSetBuilder()
.withNewMetadata()
.withName(resource.getMetadata().getName())
.withNamespace(resource.getMetadata().getNamespace())
.addNewOwnerReference()
.withApiVersion(resource.getApiVersion())
.withKind(resource.getKind())
.withName(resource.getMetadata().getName())
.withUid(resource.getMetadata().getUid())
.endOwnerReference()
.endMetadata()
.withNewSpec()
.withReplicas(resource.getSpec().getReplicas())
.withServiceName(resource.getMetadata().getName() + "-headless")
.withNewSelector()
.addToMatchLabels("app", resource.getMetadata().getName())
.endSelector()
.withNewTemplate()
.withNewMetadata()
.addToLabels("app", resource.getMetadata().getName())
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName("paxos-node")
.withImage("your-registry/paxos-state-store:latest") // The Java app
.addNewPort()
.withContainerPort(8080)
.withName("http")
.endPort()
// Environment variables to pass cluster info
.addNewEnv()
.withName("CLUSTER_SIZE")
.withValue(String.valueOf(resource.getSpec().getReplicas()))
.endEnv()
.addNewEnv()
// This is crucial for pods to know their own identity
.withName("POD_NAME")
.withNewValueFrom()
.withNewFieldRef()
.withFieldPath("metadata.name")
.endFieldRef()
.endValueFrom()
.endEnv()
.endContainer()
.endSpec()
.endTemplate()
.endSpec()
.build();
}
@Override
public Map<String, EventSource> prepareEventSources(EventSourceContext<PaxosStateStore> context) {
// This tells the operator to watch StatefulSet events and trigger reconciliation
// if a StatefulSet owned by our CR changes.
return EventSourceInitializer.nameEventSources(
new InformerEventSource<>(
client.apps().statefulSets().inAnyNamespace(), context));
}
}
这个 Reconciler 的逻辑是幂等的。无论被调用多少次,只要 PaxosStateStore 资源的状态不变,它最终都会驱动 Kubernetes 集群达到期望的状态。这里的单元测试思路应该是:模拟不同的 PaxosStateStore 资源状态(新建、更新副本数、删除)和 Kubernetes 的当前状态(StatefulSet 不存在、副本数不匹配),然后验证 reconcile 方法是否返回了正确的 UpdateControl 对象,以及是否正确调用了 Kubernetes client 的相应方法。
诊断端点与前端观察 (JavaScript)
为了方便调试和观察集群状态,我们在 Java 应用中暴露一个简单的 HTTP 端点,返回当前节点看到的 Paxos 状态。
// In the Java application (e.g., using Spring Boot or a simple HTTP server)
@GetMapping("/status")
public Map<String, Object> getStatus() {
Map<String, Object> status = new HashMap<>();
status.put("nodeId", paxosNode.getSelfId());
status.put("role", paxosNode.getCurrentRole()); // e.g., "Leader", "Follower"
status.put("currentValue", paxosNode.getLearner().getFinalValue());
status.put("maxPromisedProposal", paxosNode.getAcceptor().getMaxPromisedProposal());
status.put("acceptedProposal", paxosNode.getAcceptor().getAcceptedProposal());
return status;
}
然后,我们可以用一段简单的 JavaScript 从浏览器或命令行快速查询所有节点的状态,以判断集群是否健康、共识是否达成。
<!-- diagnostic.html -->
<!DOCTYPE html>
<html>
<head>
<title>Paxos Cluster Status</title>
<style>
body { font-family: sans-serif; }
.node { border: 1px solid #ccc; padding: 10px; margin: 10px; border-radius: 5px; }
pre { background-color: #f4f4f4; padding: 10px; }
</style>
</head>
<body>
<h1>Paxos Cluster Status</h1>
<div id="nodes"></div>
<script>
const CLUSTER_SIZE = 3;
const SERVICE_NAME = 'paxos-statestore-headless';
const NAMESPACE = 'default';
async function fetchStatus(nodeId) {
// In a real k8s environment, you'd use `kubectl port-forward` to access this.
// Example: kubectl port-forward paxos-statestore-0 8080:8080
const url = `http://localhost:808${nodeId}/status`; // Assuming port-forwarding for each pod
try {
const response = await fetch(url);
if (!response.ok) {
return { error: `HTTP error! status: ${response.status}` };
}
return await response.json();
} catch (e) {
return { error: `Fetch failed: ${e.message}` };
}
}
async function renderStatuses() {
const container = document.getElementById('nodes');
container.innerHTML = 'Loading...';
let content = '';
for (let i = 0; i < CLUSTER_SIZE; i++) {
const status = await fetchStatus(i);
content += `
<div class="node">
<h2>Node ${i} (paxos-statestore-${i})</h2>
<pre>${JSON.stringify(status, null, 2)}</pre>
</div>
`;
}
container.innerHTML = content;
}
setInterval(renderStatuses, 5000);
renderStatuses();
</script>
</body>
</html>
这段 JavaScript 代码虽然简单,但在调试分布式系统时非常实用。它能直观地展示出每个节点内部的状态,帮助我们快速发现问题,比如某个节点是否落后,或者是否存在多个 Proposer 导致活锁。
局限性与未来迭代路径
这个实现虽然验证了方案的可行性,但距离生产级应用还有很长的路。
首先,我们实现的是 Basic Paxos,它只能对单个值达成一次共识。在真实场景中,我们需要一个能对一系列操作日志达成共识的协议,例如 Multi-Paxos 或其工程实现 Raft。这需要引入日志复制、领导者选举、心跳维持等更复杂的机制。
其次,缺乏日志压缩和快照机制。随着操作增多,日志会无限增长,导致节点重启恢复时间变长,并占用大量磁盘空间。生产系统必须有定期生成快照并将旧日志丢弃的能力。
最后,当前的 Operator 还比较初级。它没有处理复杂的升级流程(例如,滚动更新时如何保证集群的可用性和数据一致性),也没有实现备份恢复、自动修复(例如,检测到某个 Pod 数据损坏后自动剔除并重建)等高级运维操作。这些都是一个成熟的、生产级的 Operator 必须具备的功能。