构建基于 Pulsar CDC 与 Ktor 的 ChromaDB 实时向量同步服务


产品需求将我们的数据架构推到了一个临界点。原有的推荐系统,其核心的相似内容搜索功能依赖一个每晚通过批处理作业更新的向量索引。对于一个期望即时反馈的用户体验而言,长达24小时的数据延迟是完全无法接受的。用户的任何内容更新,比如修改一篇博客、更新一个商品描述,都应该在数秒内反映在搜索结果中,而不是等到第二天。这就要求我们必须构建一个从源数据库到向量数据库的实时同步管道。

我们的技术栈相对固定:后端服务以 JVM 为主,消息队列是 Apache Pulsar,而新的向量搜索场景选定了简单易用的 ChromaDB。问题是如何将这三者优雅地粘合起来,构建一个稳定、高效、可维护的实时数据流。

初步构想是利用变更数据捕获(Change Data Capture, CDC)技术。我们可以在主业务数据库(PostgreSQL)上启用 CDC,将所有 INSERTUPDATE 操作转化为事件流,推送到 Pulsar 中。然后,一个消费服务订阅这个 Pulsar topic,实时地处理这些变更事件:为变更的文本数据生成向量嵌入(embeddings),再将这些向量及其元数据 upsert 到 ChromaDB。

技术选型决策如下:

  1. 消息与流处理: Apache Pulsar。我们没有选择更常见的 Kafka,主要是看中了 Pulsar 的存算分离架构和内置的 Pulsar Functions/IO。特别是 Pulsar IO 提供的 Debezium Source Connector,可以让我们无需编写任何代码,仅通过配置就将 PostgreSQL 的 wal 日志实时捕获并发送到 Pulsar topic。这极大地简化了数据捕获端的实现。
  2. 消费服务框架: Ktor。这是一个基于 Kotlin Coroutines 构建的异步 Web 框架。选择它的理由是:轻量、高性能,并且与 Kotlin 的协程生态无缝集成。处理来自 Pulsar 的消息流本质上是一个 IO 密集型任务,协程能以极低的资源开销处理大量并发消息,避免了传统线程模型的上下文切换成本。
  3. 向量数据库: ChromaDB。在众多向量数据库中,ChromaDB 以其“batteries-included”的理念和简单的 API 脱颖而出。对于这个项目,我们不需要复杂的过滤或分片功能,只需要一个稳定可靠的向量存储和检索服务。ChromaDB 的 upsert 语义(如果 ID 已存在则更新,否则插入)天然地满足了我们处理 CDC 事件的需求。

整体架构图如下:

graph TD
    A[PostgreSQL] -- Debezium CDC --> B(Pulsar IO Connector);
    B -- change events --> C{Pulsar Topic: 'product-updates'};
    C -- messages --> D[Ktor Sync Service];
    subgraph Ktor Sync Service
        direction LR
        D1(Pulsar Consumer) -- event --> D2(Embedding Generator);
        D2 -- vector --> D3(ChromaDB Client);
    end
    D3 -- upsert --> E[ChromaDB];
    F(Search API) -- query --> D;
    D -- query --> E;
    E -- results --> D;
    D -- results --> F;

第一步:配置 Pulsar CDC Connector

在真实项目中,配置永远是第一步,也是最容易出问题的地方。我们需要部署一个 Pulsar Debezium Source Connector,让它连接到 PostgreSQL 数据库。假设我们的 Pulsar 集群已经运行,可以通过 pulsar-admin 工具提交这个 connector 配置。

debezium-postgres-source-config.yaml:

tenant: "public"
namespace: "default"
name: "cdc-postgres-products"
topicName: "persistent://public/default/product-updates"
archive: "connectors/pulsar-io-debezium-postgres-2.10.0.nar"

configs:
  # --- Debezium a.k.a. the plugin ---
  database.hostname: "postgres.host.internal"
  database.port: "5432"
  database.user: "debezium_user"
  database.password: "debezium_password"
  database.dbname: "products_db"
  database.server.name: "pg-server-1" # 逻辑服务名,必须唯一
  plugin.name: "pgoutput" # PostgreSQL 10+ 推荐的逻辑解码插件

  # --- 表白名单 ---
  table.include.list: "public.products"

  # --- Pulsar a.k.a. the sink ---
  key.converter: "org.apache.kafka.connect.json.JsonConverter"
  value.converter: "org.apache.kafka.connect.json.JsonConverter"
  key.converter.schemas.enable: "false" # 我们不需要复杂的 schema,简化处理
  value.converter.schemas.enable: "false"

  # --- Pulsar Service URL ---
  pulsar.service.url: "pulsar://pulsar.host.internal:6650"

这个配置的核心在于 database.*table.include.list。它指定了 Debezium 要监听哪个数据库的哪张表。plugin.name: "pgoutput" 是关键,它要求 PostgreSQL 数据库已经配置了逻辑复制。当 public.products 表发生变更时,Debezium 会捕获它,将其序列化为 JSON,并发送到 product-updates 这个 Pulsar topic。

第二步:构建 Ktor 消费同步服务

这是整个管道的核心。我们将创建一个 Ktor 应用程序,它在启动时初始化一个 Pulsar consumer,并持续地从 topic 中拉取消息进行处理。

首先是项目依赖 (build.gradle.kts):

plugins {
    kotlin("jvm") version "1.9.21"
    id("io.ktor.plugin") version "2.3.6"
    id("org.jetbrains.kotlin.plugin.serialization") version "1.9.21"
}

// ... repositories and other configs

dependencies {
    // Ktor Core
    implementation("io.ktor:ktor-server-core-jvm")
    implementation("io.ktor:ktor-server-netty-jvm")
    implementation("io.ktor:ktor-server-config-yaml")
    implementation("io.ktor:ktor-server-content-negotiation")
    implementation("io.ktor:ktor-serialization-kotlinx-json")

    // Pulsar Client
    implementation("org.apache.pulsar:pulsar-client:3.1.1")

    // ChromaDB Client (assuming a hypothetical official/community client)
    // For this example, we'll simulate it with a simple HTTP client.
    implementation("io.ktor:ktor-client-core:2.3.6")
    implementation("io.ktor:ktor-client-cio:2.3.6")
    implementation("io.ktor:ktor-client-content-negotiation:2.3.6")

    // Logging
    implementation("ch.qos.logback:logback-classic:1.4.11")

    // Coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")

    // For embedding generation (e.g., using a local model or API)
    // This is a placeholder for a real library.
    implementation("com.your-company:embedding-generator:1.0.0")
}

接下来是 Ktor 应用的配置文件 application.yaml

ktor:
  deployment:
    port: 8080

pulsar:
  serviceUrl: "pulsar://pulsar.host.internal:6650"
  topic: "persistent://public/default/product-updates"
  subscriptionName: "vector-sync-subscription"
  subscriptionType: "Shared" # 允许水平扩展 consumer

chroma:
  host: "http://chromadb.host.internal"
  port: 8000
  collectionName: "products"

embedding:
  model: "text-embedding-ada-002" # Example model name

现在,我们来编写核心的同步逻辑。我们将创建一个 SyncService,它封装了消费和处理消息的循环。

package com.yourapp.sync

import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.contentnegotiation.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.serialization.kotlinx.json.*
import kotlinx.coroutines.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import org.apache.pulsar.client.api.*
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit

// 数据模型,对应 Debezium 输出的 JSON 结构
@Serializable
data class CdcPayload(val payload: Payload)

@Serializable
data class Payload(val after: ProductData?) // 我们只关心变更后的数据

@Serializable
data class ProductData(val id: Int, val name: String, val description: String)

// 模拟的 Embedding 服务客户端
class EmbeddingClient {
    // 在真实项目中,这里会调用一个模型服务 API 或加载本地模型
    suspend fun generate(text: String): List<Float> {
        delay(50) // 模拟网络/计算延迟
        return List(1536) { Math.random().toFloat() }
    }
}

// ChromaDB 客户端
class ChromaDBClient(private val httpClient: HttpClient, private val host: String, private val port: Int) {
    private val collectionName = "products"
    private val json = Json { ignoreUnknownKeys = true; prettyPrint = true }

    @Serializable
    data class UpsertRequest(val ids: List<String>, val embeddings: List<List<Float>>, val metadatas: List<Map<String, String>>)

    suspend fun upsert(id: String, embedding: List<Float>, metadata: Map<String, String>) {
        val requestBody = UpsertRequest(listOf(id), listOf(embedding), listOf(metadata))
        val url = "$host:$port/api/v1/collections/$collectionName/upsert"
        
        // 这里的坑在于,ChromaDB客户端库可能没有内置重试机制
        // 在生产环境中,需要添加基于指数退避的重试逻辑
        httpClient.post(url) {
            contentType(ContentType.Application.Json)
            setBody(requestBody)
        }.let { response ->
            if (!response.status.isSuccess()) {
                val errorBody = response.status.description
                throw RuntimeException("Failed to upsert to ChromaDB. Status: ${response.status}. Body: $errorBody")
            }
        }
    }
}

class SyncService(
    private val pulsarClient: PulsarClient,
    private val chromaDBClient: ChromaDBClient,
    private val embeddingClient: EmbeddingClient,
    private val config: AppConfig
) {
    private val logger = LoggerFactory.getLogger(javaClass)
    private val json = Json { ignoreUnknownKeys = true }

    // 使用 CoroutineScope 来管理 consumer 循环的生命周期
    private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

    fun start() {
        scope.launch {
            val consumer = createConsumer()
            logger.info("Pulsar consumer started on topic '${config.pulsar.topic}'. Waiting for messages...")
            
            // 消费循环:这是整个服务的核心
            while (isActive) {
                try {
                    // receive() 是一个挂起函数,如果没有消息它会在此等待
                    val message = consumer.receive()
                    
                    // 将消息处理放到一个新的协程中,避免阻塞消费循环
                    // 但这里有一个陷阱:如果不加限制,消息突增时会创建海量协程导致 OOM
                    // 更好的做法是使用 Flow 或 Channel 并发限制
                    launch {
                        processMessage(consumer, message)
                    }

                } catch (e: PulsarClientException.AlreadyClosedException) {
                    logger.warn("Consumer already closed. Exiting loop.")
                    break // 优雅退出
                } catch (e: Exception) {
                    logger.error("Error in consumer loop", e)
                    // 避免因为未知错误导致整个循环退出,可以增加延迟后重试
                    delay(1000)
                }
            }
        }
    }

    private suspend fun processMessage(consumer: Consumer<ByteArray>, message: Message<ByteArray>) {
        val messageId = message.messageId
        try {
            val rawJson = String(message.data)
            val cdcPayload = json.decodeFromString<CdcPayload>(rawJson)

            // Debezium 删除事件的 'after' 字段为 null,我们需要处理这种情况
            val productData = cdcPayload.payload.after
            if (productData == null) {
                logger.info("Received a DELETE event for messageId $messageId. Skipping for now.")
                // 可以在此添加删除 ChromaDB 中对应向量的逻辑
                consumer.acknowledge(messageId)
                return
            }

            logger.info("Processing product update: ID=${productData.id}, Name='${productData.name}'")

            // 1. 生成待 embedding 的文本
            val textToEmbed = "${productData.name}: ${productData.description}"

            // 2. 调用模型生成向量
            val embedding = embeddingClient.generate(textToEmbed)

            // 3. 准备元数据
            val metadata = mapOf("name" to productData.name)

            // 4. Upsert到ChromaDB,使用产品ID作为向量ID,这天然地保证了幂等性
            chromaDBClient.upsert(productData.id.toString(), embedding, metadata)
            
            // 5. 关键一步:只有在所有操作成功后,才确认消息
            // 这样,如果在处理过程中(比如ChromaDB宕机)服务崩溃,消息不会丢失
            // Pulsar会在超时后将它重新投递给另一个可用的 consumer
            consumer.acknowledge(messageId)
            logger.info("Successfully processed and acknowledged message $messageId for product ID ${productData.id}")

        } catch (e: Exception) {
            // 异常处理是生产级服务的关键
            logger.error("Failed to process message $messageId. It will be redelivered.", e)
            // 不确认消息,让Pulsar在ackTimeout后重投。
            // 在真实项目中,应该配置死信队列(Dead Letter Queue),防止有毒消息无限重试。
            consumer.negativeAcknowledge(messageId)
        }
    }

    private fun createConsumer(): Consumer<ByteArray> {
        return pulsarClient.newConsumer()
            .topic(config.pulsar.topic)
            .subscriptionName(config.pulsar.subscriptionName)
            .subscriptionType(SubscriptionType.valueOf(config.pulsar.subscriptionType))
            .ackTimeout(60, TimeUnit.SECONDS) // 增加超时时间,因为 embedding 可能耗时
            .deadLetterPolicy(
                DeadLetterPolicy.builder()
                    .maxRedeliverCount(10) // 最多重试10次
                    .deadLetterTopic("${config.pulsar.topic}-DLQ")
                    .build()
            )
            .subscribe()
    }

    fun stop() {
        scope.cancel() // 取消所有协程
    }
}

最后,在 Ktor 的主入口 Application.kt 中启动这个服务:

package com.yourapp

import com.yourapp.sync.ChromaDBClient
import com.yourapp.sync.EmbeddingClient
import com.yourapp.sync.SyncService
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import org.apache.pulsar.client.api.PulsarClient

// 用于加载配置的 data class
data class AppConfig(val pulsar: PulsarConfig, val chroma: ChromaConfig)
data class PulsarConfig(val serviceUrl: String, val topic: String, val subscriptionName: String, val subscriptionType: String)
data class ChromaConfig(val host: String, val port: Int, val collectionName: String)

fun main() {
    embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = Application::module).start(wait = true)
}

fun Application.module() {
    // 从 application.yaml 加载配置
    val appConfig = AppConfig(
        pulsar = PulsarConfig(
            environment.config.property("pulsar.serviceUrl").getString(),
            environment.config.property("pulsar.topic").getString(),
            environment.config.property("pulsar.subscriptionName").getString(),
            environment.config.property("pulsar.subscriptionType").getString()
        ),
        chroma = ChromaConfig(
            environment.config.property("chroma.host").getString(),
            environment.config.property("chroma.port").getString().toInt(),
            environment.config.property("chroma.collectionName").getString()
        )
    )

    // 1. 初始化所有客户端
    val pulsarClient = PulsarClient.builder().serviceUrl(appConfig.pulsar.serviceUrl).build()
    val httpClient = HttpClient(CIO) {
        install(ContentNegotiation) { json() }
    }
    val chromaDBClient = ChromaDBClient(httpClient, appConfig.chroma.host, appConfig.chroma.port)
    val embeddingClient = EmbeddingClient()

    // 2. 创建并启动同步服务
    val syncService = SyncService(pulsarClient, chromaDBClient, embeddingClient, appConfig)
    
    // 注册 Ktor 生命周期钩子,在应用停止时优雅关闭资源
    environment.monitor.subscribe(ApplicationStarted) {
        syncService.start()
    }
    environment.monitor.subscribe(ApplicationStopping) {
        syncService.stop()
        pulsarClient.close()
        httpClient.close()
        log.info("Resources cleaned up.")
    }
    
    // 可以在此添加搜索 API endpoint
    // configureRouting(chromaDBClient, embeddingClient)
}

这段代码展示了一个生产级的消费服务应该具备的要素:

  • 配置驱动:所有连接信息和业务参数都通过配置文件管理。
  • 生命周期管理:利用 Ktor 的 ApplicationStartedApplicationStopping 事件来启动和关闭后台服务,确保资源被正确释放。
  • 协程与并发:使用 CoroutineScope 管理后台任务,并利用 Dispatchers.IO 将阻塞操作(如网络请求)调度到合适的线程池。
  • 健壮的错误处理:区分可恢复和不可恢复的错误。通过 negativeAcknowledge 和死信队列(DLQ)策略,防止“毒丸消息”阻塞整个管道。
  • 幂等性设计:利用数据库主键作为 ChromaDB 的 id,保证了 upsert 操作的幂等性,即使同一条 CDC 消息被重复消费,最终结果也是一致的。

潜在陷阱与优化路径

虽然当前方案解决了实时同步的核心问题,但在投入生产前,还有几个问题需要考虑。

首先,launch 一个新的协程来处理每条消息过于粗放。当上游数据发生批量变更时(例如,一次性导入10万条数据),会瞬间创建10万个协程,这可能导致服务内存溢出或压垮下游的 Embedding 服务和 ChromaDB。一个更稳健的实现是使用 Kotlin Flow 的 bufferconflate,或者使用 Channel 来控制并发度。例如,可以创建一个固定大小的协程池来处理这些任务。

// 改进的消费循环,使用 Channel 限制并发
val processingChannel = Channel<Message<ByteArray>>(capacity = 100) // 缓冲区大小

// 启动固定数量的 worker 协程
repeat(16) { // 并发数为 16
    scope.launch {
        for (message in processingChannel) {
            processMessage(consumer, message)
        }
    }
}

// 主循环只负责接收消息并发送到 channel
while (isActive) {
    val message = consumer.receive()
    processingChannel.send(message)
}

其次,Embedding 生成是整个流程中最耗时的部分,并且通常可以从批处理中受益。当前的实现是逐条生成,效率不高。可以将消息在内存中缓存一小段时间(例如100毫秒)或达到一定数量(例如32条),然后批量调用 Embedding 服务和 ChromaDB 的 upsert API。这能显著降低网络开销和提高模型推理的吞吐量。

最后,当前方案没有处理 Schema 演进。如果 products 表增加或删除了字段,Debezium 发送的 JSON 结构会变化,可能导致 Json.decodeFromString 失败。在生产环境中,消费端需要有更强的容错能力,或者使用 Avro/Protobuf 这类支持 Schema 注册和演进的序列化格式来代替纯 JSON。Pulsar 对此有原生支持。

这个架构的适用边界在于数据变更的频率和复杂性。对于变更极高频(每秒数万次)的场景,单个 Ktor 服务的处理能力可能成为瓶颈,需要部署多个实例并利用 Pulsar 的 Shared 订阅模式来分摊负载。同时,如果业务逻辑需要关联多个数据流或进行有状态的计算,简单的 consumer 模式就不够了,可能需要引入 Flink 或 Pulsar Functions 这样更专业的流处理引擎。


  目录