Java Operator 结合 Paxos 实现 Kubeflow 自定义高可用组件


在维护一个大规模 Kubeflow 集群时,我们遇到了一个棘手的状态管理问题。某些复杂的 Pipeline 需要一个轻量级、高可用的协调服务来存储关键元数据,例如当前执行阶段、动态生成的配置或是小规模的中间产物校验和。这些数据要求强一致性,并且服务本身不能成为单点故障。

最初的方案是引入一个外部的 etcd 或 ZooKeeper 集群。但在真实项目中,这意味着为 MLOps 平台引入一个沉重的外部依赖。运维团队需要维护它,网络策略需要为它开放,这增加了整个系统的复杂性和故障点。我们需要的是一个与 Kubeflow 生态无缝集成的、云原生的解决方案。

方案A,即外部依赖方案,其优劣非常明显。

  • 优势: 技术成熟,稳定性经过了大规模验证。社区活跃,问题容易定位。
  • 劣势:
    1. 运维成本: 引入了新的技术栈,需要独立的监控、备份和升级策略。
    2. 资源开销: 即便是一个最小化的3节点 etcd 集群,对于我们这个轻量级协调任务来说也显得过于臃肿。
    3. 集成复杂度: 应用需要处理与外部集群的连接、认证和网络分区问题,这些逻辑与我们的核心业务无关。

于是,我们转向方案B:构建一个内嵌的、由 Kubernetes Operator 自动管理的分布式状态组件。这个组件将以后端 Pod 的形式与 Kubeflow Pipeline 的其他部分一同部署,并利用分布式一致性协议保证自身的高可用。

  • 优势:
    1. 云原生: 完美契合 Kubernetes 的声明式API和自愈能力。用户只需一个 YAML 文件就能部署和管理整个高可用服务。
    2. 轻量级: 没有额外的进程或虚拟机开销,资源占用按需分配。
    3. 自治性: Operator 负责服务的生命周期管理,包括部署、扩缩容、故障恢复,极大地降低了运维心智负担。
  • 劣势:
    1. 研发成本: 需要自行实现或集成一个分布式一致性算法,并开发一个功能完备的 Operator。
    2. 技术风险: 一致性算法的实现极易出错,需要严谨的设计和充分的测试。

权衡之下,我们决定选择方案B。长期来看,一个自治的、云原生的组件能更好地融入平台工程的理念,减少摩擦,提升研发效能。技术挑战虽然巨大,但其回报——一个完全自控、高度集成的高可用组件——是值得的。我们选择 Java 作为实现语言,因为它在企业级后端服务中拥有成熟的生态,并且有可靠的 Kubernetes Operator SDK。一致性协议则选择了 Paxos,因为它是一切共识算法的理论基石,深入理解并实现它对团队的技术成长大有裨益。

架构设计概览

我们的目标是创建一个名为 PaxosStateStore 的自定义资源(CRD)。用户可以通过一个 PaxosStateStore 对象来声明一个状态存储集群。我们的 Java Operator 会监听这些对象,并自动创建、管理一个对应的 StatefulSet。每个 StatefulSet 的 Pod 都运行着我们自己开发的 Java 应用,该应用内嵌了 Paxos 协议的实现,并相互通信构成一个一致性集群。

graph TD
    subgraph Kubernetes API Server
        A[User: kubectl apply -f paxos-cr.yaml] --> B{CRD: PaxosStateStore}
    end

    subgraph "Control Plane (Our Operator)"
        C(Paxos Operator) --Watches--> B
        C --Creates/Updates--> D[StatefulSet]
        C --Creates/Updates--> E[Service]
    end

    subgraph "Worker Nodes"
        D --> F1(Pod-0: Java App + Paxos)
        D --> F2(Pod-1: Java App + Paxos)
        D --> F3(Pod-2: Java App + Paxos)
        E --Provides Stable Network ID--> F1
        E --> F2
        E --> F3
        F1 <--Inter-Pod Communication--> F2
        F2 <--Inter-Pod Communication--> F3
        F3 <--Inter-Pod Communication--> F1
    end

    G[Kubeflow Pipeline Pod] --Uses Service E--> F1

Paxos 核心实现 (Java)

我们实现的是经典的 Basic Paxos,专注于单个值的共识达成。这足以满足我们存储关键元数据的需求。我们将 Paxos 的角色(Proposer, Acceptor, Learner)通过状态模式(State Pattern)在一个 PaxosNode 类中体现。

1. 数据结构与消息体

首先定义协议中流转的消息。这里使用简单的 POJO,在实际项目中会使用 JSON 或 Protobuf 进行序列化。

// src/main/java/com/platform/paxos/model/Message.java
package com.platform.paxos.model;

import java.io.Serializable;

// Base class for all Paxos messages
public abstract class Message implements Serializable {
    private static final long serialVersionUID = 1L;
    private final int senderId;

    public Message(int senderId) {
        this.senderId = senderId;
    }

    public int getSenderId() {
        return senderId;
    }
}

// Phase 1a: Prepare Request
public class PrepareRequest extends Message {
    private static final long serialVersionUID = 1L;
    private final long proposalNumber;

    public PrepareRequest(int senderId, long proposalNumber) {
        super(senderId);
        this.proposalNumber = proposalNumber;
    }

    public long getProposalNumber() {
        return proposalNumber;
    }
}

// Phase 1b: Promise Response
public class PromiseResponse extends Message {
    private static final long serialVersionUID = 1L;
    private final boolean promised;
    private final long lastAcceptedProposal;
    private final Object lastAcceptedValue;

    public PromiseResponse(int senderId, boolean promised, long lastAcceptedProposal, Object lastAcceptedValue) {
        super(senderId);
        this.promised = promised;
        this.lastAcceptedProposal = lastAcceptedProposal;
        this.lastAcceptedValue = lastAcceptedValue;
    }
    // Getters...
}

// Phase 2a: Accept Request
public class AcceptRequest extends Message {
    private static final long serialVersionUID = 1L;
    private final long proposalNumber;
    private final Object value;

    public AcceptRequest(int senderId, long proposalNumber, Object value) {
        super(senderId);
        this.proposalNumber = proposalNumber;
        this.value = value;
    }
    // Getters...
}

// Phase 2b: Accepted Response
public class AcceptedResponse extends Message {
    private static final long serialVersionUID = 1L;
    private final boolean accepted;
    private final long proposalNumber;

    public AcceptedResponse(int senderId, boolean accepted, long proposalNumber) {
        super(senderId);
        this.accepted = accepted;
        this.proposalNumber = proposalNumber;
    }
    // Getters...
}

2. Acceptor 核心逻辑

Acceptor 是 Paxos 算法的核心,它负责承诺(Promise)和接受(Accept)提议。它的状态必须持久化,以防节点崩溃。在我们的实现中,暂时用内存变量模拟,真实项目中需要写入磁盘。

// src/main/java/com/platform/paxos/core/Acceptor.java
package com.platform.paxos.core;

import com.platform.paxos.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

public class Acceptor {
    private static final Logger logger = LoggerFactory.getLogger(Acceptor.class);

    private final ReentrantLock lock = new ReentrantLock();
    // The highest proposal number this acceptor has promised.
    private long maxPromisedProposal = -1L;
    // The highest proposal number this acceptor has accepted.
    private long acceptedProposal = -1L;
    // The value associated with the highest accepted proposal.
    private Object acceptedValue = null;

    private final int selfId;

    public Acceptor(int selfId) {
        this.selfId = selfId;
    }

    /**
     * Handles a Prepare request from a Proposer.
     * This is Phase 1b of the Paxos algorithm.
     */
    public PromiseResponse onPrepare(PrepareRequest request) {
        lock.lock();
        try {
            long proposalNumber = request.getProposalNumber();
            logger.info("Node {}: Received Prepare request with proposal number {}", selfId, proposalNumber);

            if (proposalNumber > maxPromisedProposal) {
                maxPromisedProposal = proposalNumber;
                logger.info("Node {}: Promised for proposal {}. Replying with last accepted proposal {} and value {}",
                        selfId, proposalNumber, acceptedProposal, acceptedValue);
                // Return a promise, along with the previously accepted proposal and value (if any).
                return new PromiseResponse(selfId, true, acceptedProposal, acceptedValue);
            } else {
                logger.warn("Node {}: Rejected Prepare for proposal {} (current max promised is {})",
                        selfId, proposalNumber, maxPromisedProposal);
                // The proposal number is not high enough, reject.
                return new PromiseResponse(selfId, false, -1L, null);
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * Handles an Accept request from a Proposer.
     * This is Phase 2b of the Paxos algorithm.
     */
    public AcceptedResponse onAccept(AcceptRequest request) {
        lock.lock();
        try {
            long proposalNumber = request.getProposalNumber();
            logger.info("Node {}: Received Accept request for proposal {} with value {}", selfId, proposalNumber, request.getValue());

            // Accept the proposal if its number is greater than or equal to the highest one promised.
            if (proposalNumber >= maxPromisedProposal) {
                // This is a critical step: update both the promised and accepted numbers.
                maxPromisedProposal = proposalNumber;
                acceptedProposal = proposalNumber;
                acceptedValue = request.getValue();
                logger.info("Node {}: Accepted proposal {} with value {}. Notifying learners.", selfId, proposalNumber, request.getValue());
                return new AcceptedResponse(selfId, true, proposalNumber);
            } else {
                logger.warn("Node {}: Rejected Accept for proposal {} (current max promised is {})",
                        selfId, proposalNumber, maxPromisedProposal);
                return new AcceptedResponse(selfId, false, proposalNumber);
            }
        } finally {
            lock.unlock();
        }
    }
}

这里的锁机制至关重要,确保了对maxPromisedProposal, acceptedProposalacceptedValue 的原子性操作,防止并发请求导致状态不一致。在真实项目中,这些状态变量必须持久化到磁盘(例如,写入一个预写日志 WAL),以保证节点重启后状态不丢失。

3. Proposer 核心逻辑

Proposer 负责发起提议,它需要经历两个阶段,并处理多数派的响应。

// src/main/java/com/platform/paxos/core/Proposer.java
package com.platform.paxos.core;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// ... imports for networking and models

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class Proposer {
    private static final Logger logger = LoggerFactory.getLogger(Proposer.class);

    private final int selfId;
    private final int clusterSize;
    private final int majority;
    private final Network aNetwork; // Abstract networking layer
    private long proposalNumberCounter;

    public Proposer(int selfId, int clusterSize, Network network) {
        this.selfId = selfId;
        this.clusterSize = clusterSize;
        this.majority = clusterSize / 2 + 1;
        this.aNetwork = network;
        // A common practice to ensure unique proposal numbers across nodes.
        this.proposalNumberCounter = System.currentTimeMillis() << 16 | selfId;
    }

    public CompletableFuture<Object> propose(Object value) {
        CompletableFuture<Object> consensusFuture = new CompletableFuture<>();
        
        // Use a thread to avoid blocking the caller
        new Thread(() -> {
            proposalNumberCounter++;
            final long currentProposalNumber = proposalNumberCounter;
            
            // --- Phase 1: Prepare ---
            logger.info("Node {}: Starting proposal {} with value {}", selfId, currentProposalNumber, value);
            PrepareRequest prepareRequest = new PrepareRequest(selfId, currentProposalNumber);
            Collection<PromiseResponse> promises = aNetwork.broadcastAndWait(prepareRequest, majority);

            if (promises.size() < majority) {
                logger.warn("Node {}: Proposal {} failed in Phase 1. Not enough promises received ({} < {})",
                        selfId, currentProposalNumber, promises.size(), majority);
                consensusFuture.completeExceptionally(new RuntimeException("Failed to get majority promise"));
                return;
            }

            // --- Analyze Promises ---
            long highestAcceptedProposal = -1L;
            Object valueToPropose = value;
            
            for (PromiseResponse promise : promises) {
                if (promise.getLastAcceptedProposal() > highestAcceptedProposal) {
                    highestAcceptedProposal = promise.getLastAcceptedProposal();
                    valueToPropose = promise.getLastAcceptedValue(); // A key rule of Paxos!
                }
            }

            if (highestAcceptedProposal != -1L) {
                logger.info("Node {}: A previous value was accepted (proposal {}). Proposing that value ({}) instead.",
                        selfId, highestAcceptedProposal, valueToPropose);
            }
            
            // --- Phase 2: Accept ---
            AcceptRequest acceptRequest = new AcceptRequest(selfId, currentProposalNumber, valueToPropose);
            Collection<AcceptedResponse> acceptances = aNetwork.broadcastAndWait(acceptRequest, majority);
            
            if (acceptances.size() >= majority) {
                logger.info("Node {}: Proposal {} with value {} has been accepted by a majority ({}). Consensus reached.",
                        selfId, currentProposalNumber, valueToPropose, acceptances.size());
                // In a full implementation, we'd wait for a Learner to confirm. For simplicity, we complete here.
                consensusFuture.complete(valueToPropose);
            } else {
                logger.warn("Node {}: Proposal {} failed in Phase 2. Not enough acceptances received ({} < {})",
                        selfId, currentProposalNumber, acceptances.size(), majority);
                consensusFuture.completeExceptionally(new RuntimeException("Failed to get majority acceptance"));
            }
        }).start();

        return consensusFuture;
    }
}

一个常见的错误是,Proposer 在收到 Promises 后,无论如何都继续提议自己最初的值。正确的做法是,检查所有 Promise 回复,如果其中有已经接受过的提议,必须选择那个拥有最大提议号的值,作为自己第二阶段要提议的值。这是 Paxos 算法保证安全性的关键。

Kubernetes Operator 实现 (Java)

我们使用 java-operator-sdk 来简化 Operator 的开发。

1. CRD (Custom Resource Definition)

首先,定义 PaxosStateStore 资源。它描述了我们期望的状态,比如集群大小。

// src/main/java/com/platform/operator/crd/PaxosStateStore.java
package com.platform.operator.crd;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("platform.com")
@Version("v1alpha1")
public class PaxosStateStore extends CustomResource<PaxosStateStoreSpec, PaxosStateStoreStatus> implements Namespaced {
}

// src/main/java/com/platform/operator/crd/PaxosStateStoreSpec.java
package com.platform.operator.crd;

public class PaxosStateStoreSpec {
    private int replicas;
    // other configs like version, resources...

    public int getReplicas() {
        return replicas;
    }

    public void setReplicas(int replicas) {
        this.replicas = replicas;
    }
}

// src/main/java/com/platform/operator/crd/PaxosStateStoreStatus.java
package com.platform.operator.crd;

public class PaxosStateStoreStatus {
    private int availableReplicas;
    private String phase; // e.g., "CREATING", "READY"

    // Getters and setters...
}

2. Reconciler 核心逻辑

Reconciler 是 Operator 的大脑,它的 reconcile 方法会被周期性调用,以确保实际状态与 CR 中定义的目标状态一致。

// src/main/java/com/platform/operator/PaxosStateStoreReconciler.java
package com.platform.operator;

import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.processing.event.source.InformerEventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.platform.operator.crd.PaxosStateStore;

@ControllerConfiguration
public class PaxosStateStoreReconciler implements Reconciler<PaxosStateStore>, EventSourceInitializer<PaxosStateStore> {

    private static final Logger logger = LoggerFactory.getLogger(PaxosStateStoreReconciler.class);
    private final KubernetesClient client;

    public PaxosStateStoreReconciler(KubernetesClient client) {
        this.client = client;
    }

    @Override
    public UpdateControl<PaxosStateStore> reconcile(PaxosStateStore resource, Context<PaxosStateStore> context) {
        final String namespace = resource.getMetadata().getNamespace();
        final String name = resource.getMetadata().getName();

        // 1. Check if StatefulSet exists.
        StatefulSet existingStatefulSet = client.apps().statefulSets().inNamespace(namespace).withName(name).get();

        if (existingStatefulSet == null) {
            logger.info("StatefulSet {} not found, creating.", name);
            StatefulSet newStatefulSet = createDesiredStatefulSet(resource);
            client.apps().statefulSets().inNamespace(namespace).create(newStatefulSet);
            // Update status
            resource.getStatus().setPhase("CREATING");
            return UpdateControl.updateStatus(resource);
        }

        // 2. Check if the number of replicas matches the spec.
        int currentReplicas = existingStatefulSet.getSpec().getReplicas();
        int desiredReplicas = resource.getSpec().getReplicas();

        if (currentReplicas != desiredReplicas) {
            logger.info("Replicas mismatch for {}. Current: {}, Desired: {}. Scaling...", name, currentReplicas, desiredReplicas);
            client.apps().statefulSets().inNamespace(namespace).withName(name).scale(desiredReplicas);
            resource.getStatus().setPhase("SCALING");
            return UpdateControl.updateStatus(resource);
        }

        // 3. Update status to Ready if everything is fine.
        int readyReplicas = existingStatefulSet.getStatus().getReadyReplicas() != null ? existingStatefulSet.getStatus().getReadyReplicas() : 0;
        resource.getStatus().setAvailableReplicas(readyReplicas);
        if (readyReplicas == desiredReplicas) {
            resource.getStatus().setPhase("READY");
        }
        
        logger.info("Reconciliation for {} complete. Current state is desired.", name);
        return UpdateControl.updateStatus(resource).rescheduleAfter(30000); // Re-check every 30s
    }

    private StatefulSet createDesiredStatefulSet(PaxosStateStore resource) {
        // Here we build the StatefulSet object programmatically.
        // This includes defining the container image, ports, volumes, etc.
        // It's critical to use ownerReferences to link the StatefulSet to our CR.
        // This ensures that when the CR is deleted, the StatefulSet is garbage collected.
        return new StatefulSetBuilder()
                .withNewMetadata()
                    .withName(resource.getMetadata().getName())
                    .withNamespace(resource.getMetadata().getNamespace())
                    .addNewOwnerReference()
                        .withApiVersion(resource.getApiVersion())
                        .withKind(resource.getKind())
                        .withName(resource.getMetadata().getName())
                        .withUid(resource.getMetadata().getUid())
                    .endOwnerReference()
                .endMetadata()
                .withNewSpec()
                    .withReplicas(resource.getSpec().getReplicas())
                    .withServiceName(resource.getMetadata().getName() + "-headless")
                    .withNewSelector()
                        .addToMatchLabels("app", resource.getMetadata().getName())
                    .endSelector()
                    .withNewTemplate()
                        .withNewMetadata()
                            .addToLabels("app", resource.getMetadata().getName())
                        .endMetadata()
                        .withNewSpec()
                            .addNewContainer()
                                .withName("paxos-node")
                                .withImage("your-registry/paxos-state-store:latest") // The Java app
                                .addNewPort()
                                    .withContainerPort(8080)
                                    .withName("http")
                                .endPort()
                                // Environment variables to pass cluster info
                                .addNewEnv()
                                    .withName("CLUSTER_SIZE")
                                    .withValue(String.valueOf(resource.getSpec().getReplicas()))
                                .endEnv()
                                .addNewEnv()
                                     // This is crucial for pods to know their own identity
                                    .withName("POD_NAME")
                                    .withNewValueFrom()
                                        .withNewFieldRef()
                                            .withFieldPath("metadata.name")
                                        .endFieldRef()
                                    .endValueFrom()
                                .endEnv()
                            .endContainer()
                        .endSpec()
                    .endTemplate()
                .endSpec()
                .build();
    }
    
    @Override
    public Map<String, EventSource> prepareEventSources(EventSourceContext<PaxosStateStore> context) {
        // This tells the operator to watch StatefulSet events and trigger reconciliation
        // if a StatefulSet owned by our CR changes.
        return EventSourceInitializer.nameEventSources(
            new InformerEventSource<>(
                client.apps().statefulSets().inAnyNamespace(), context));
    }
}

这个 Reconciler 的逻辑是幂等的。无论被调用多少次,只要 PaxosStateStore 资源的状态不变,它最终都会驱动 Kubernetes 集群达到期望的状态。这里的单元测试思路应该是:模拟不同的 PaxosStateStore 资源状态(新建、更新副本数、删除)和 Kubernetes 的当前状态(StatefulSet 不存在、副本数不匹配),然后验证 reconcile 方法是否返回了正确的 UpdateControl 对象,以及是否正确调用了 Kubernetes client 的相应方法。

诊断端点与前端观察 (JavaScript)

为了方便调试和观察集群状态,我们在 Java 应用中暴露一个简单的 HTTP 端点,返回当前节点看到的 Paxos 状态。

// In the Java application (e.g., using Spring Boot or a simple HTTP server)
@GetMapping("/status")
public Map<String, Object> getStatus() {
    Map<String, Object> status = new HashMap<>();
    status.put("nodeId", paxosNode.getSelfId());
    status.put("role", paxosNode.getCurrentRole()); // e.g., "Leader", "Follower"
    status.put("currentValue", paxosNode.getLearner().getFinalValue());
    status.put("maxPromisedProposal", paxosNode.getAcceptor().getMaxPromisedProposal());
    status.put("acceptedProposal", paxosNode.getAcceptor().getAcceptedProposal());
    return status;
}

然后,我们可以用一段简单的 JavaScript 从浏览器或命令行快速查询所有节点的状态,以判断集群是否健康、共识是否达成。

<!-- diagnostic.html -->
<!DOCTYPE html>
<html>
<head>
    <title>Paxos Cluster Status</title>
    <style>
        body { font-family: sans-serif; }
        .node { border: 1px solid #ccc; padding: 10px; margin: 10px; border-radius: 5px; }
        pre { background-color: #f4f4f4; padding: 10px; }
    </style>
</head>
<body>
    <h1>Paxos Cluster Status</h1>
    <div id="nodes"></div>
    <script>
        const CLUSTER_SIZE = 3;
        const SERVICE_NAME = 'paxos-statestore-headless';
        const NAMESPACE = 'default';

        async function fetchStatus(nodeId) {
            // In a real k8s environment, you'd use `kubectl port-forward` to access this.
            // Example: kubectl port-forward paxos-statestore-0 8080:8080
            const url = `http://localhost:808${nodeId}/status`; // Assuming port-forwarding for each pod
            try {
                const response = await fetch(url);
                if (!response.ok) {
                    return { error: `HTTP error! status: ${response.status}` };
                }
                return await response.json();
            } catch (e) {
                return { error: `Fetch failed: ${e.message}` };
            }
        }

        async function renderStatuses() {
            const container = document.getElementById('nodes');
            container.innerHTML = 'Loading...';
            let content = '';

            for (let i = 0; i < CLUSTER_SIZE; i++) {
                const status = await fetchStatus(i);
                content += `
                    <div class="node">
                        <h2>Node ${i} (paxos-statestore-${i})</h2>
                        <pre>${JSON.stringify(status, null, 2)}</pre>
                    </div>
                `;
            }
            container.innerHTML = content;
        }

        setInterval(renderStatuses, 5000);
        renderStatuses();
    </script>
</body>
</html>

这段 JavaScript 代码虽然简单,但在调试分布式系统时非常实用。它能直观地展示出每个节点内部的状态,帮助我们快速发现问题,比如某个节点是否落后,或者是否存在多个 Proposer 导致活锁。

局限性与未来迭代路径

这个实现虽然验证了方案的可行性,但距离生产级应用还有很长的路。

首先,我们实现的是 Basic Paxos,它只能对单个值达成一次共识。在真实场景中,我们需要一个能对一系列操作日志达成共识的协议,例如 Multi-Paxos 或其工程实现 Raft。这需要引入日志复制、领导者选举、心跳维持等更复杂的机制。

其次,缺乏日志压缩和快照机制。随着操作增多,日志会无限增长,导致节点重启恢复时间变长,并占用大量磁盘空间。生产系统必须有定期生成快照并将旧日志丢弃的能力。

最后,当前的 Operator 还比较初级。它没有处理复杂的升级流程(例如,滚动更新时如何保证集群的可用性和数据一致性),也没有实现备份恢复、自动修复(例如,检测到某个 Pod 数据损坏后自动剔除并重建)等高级运维操作。这些都是一个成熟的、生产级的 Operator 必须具备的功能。


  目录