构建基于 Serverless 与 Celery 的异步任务处理架构并由 Lit 实现前端状态同步


项目的需求是在一个高度动态的前端界面上触发一系列计算密集型任务,这些任务的执行时间从30秒到15分钟不等,例如生成复杂的PDF报告、数据批量清洗或小规模的模型训练。一个直接的HTTP请求-响应模型在这里显然是行不通的,任何超过30秒的同步等待都会导致网关超时和极差的用户体验。在生产环境中,维持成百上千个长连接来等待任务完成,不仅资源消耗巨大,而且架构本身也十分脆弱。

最初的构想是部署一个传统的后端服务集群来接收任务并异步处理。但这引出了另一个问题:任务的发起量具有极端的峰值特性。系统可能在一天中的某个小时内接收上万个请求,而在其余时间则几乎闲置。为峰值流量准备常驻的计算资源,意味着在大部分时间里,服务器成本被白白浪费。这正是 Serverless 架构最擅长解决的场景。

然而,Serverless 函数(如 AWS Lambda)本身也存在执行时长的限制,通常是15分钟。虽然我们的任务大部分能在此时间内完成,但总有边缘情况会超出。更重要的是,将重量级的依赖(如某些数据科学库)打包进 Lambda 函数会使其变得臃肿,冷启动问题也会愈发严重。

于是,一个混合架构的轮廓开始清晰起来:

  1. API 入口层: 使用 Serverless (AWS Lambda + API Gateway) 来处理前端的HTTP请求。它只负责请求校验、认证和任务的分派,完成后立即响应。这完美地利用了 Serverless 的弹性伸缩和按需付费的优势。
  2. 任务执行层: 使用一个稳定、强大的分布式任务队列——Celery。Celery workers 可以部署在更稳定的计算环境(如EC2或ECS)上,它们是为长时间运行和处理重型计算而设计的。
  3. 前端交互层: 采用轻量级的 Web Components 库 Lit。它的响应式特性和组件化思想,非常适合构建一个需要实时反馈任务状态的UI。
  4. 状态通知机制: 这是整个架构的粘合剂。前端发起任务后,如何得知任务已完成?轮询(Polling)是一种简单粗暴的方案,但在大规模应用中会给后端带来巨大压力且实时性差。我们需要一个服务端的推送机制。

最终的技术选型决策是:AWS Lambda 接收请求,将任务元数据推送到 SQS 队列(作为 Celery 的 Broker),EC2/ECS 上的 Celery Worker 消费队列中的任务并执行。任务完成后,Celery Worker 通过 AWS IoT Core (MQTT) 将结果推送回前端。Lit 组件负责订阅特定的 MQTT 主题,并根据收到的消息实时更新UI状态。这种事件驱动的方案彻底解耦了系统的各个部分。

架构流程设计

在深入代码之前,我们先用图表明确整个数据流和组件交互。

sequenceDiagram
    participant LitClient as Lit 前端
    participant APIGateway as API Gateway
    participant Lambda as Lambda 触发函数
    participant SQS as SQS 任务队列
    participant CeleryWorker as Celery Worker
    participant IoTCore as AWS IoT Core (MQTT)

    LitClient->>+APIGateway: POST /tasks (含任务参数)
    APIGateway->>+Lambda: 触发函数
    Lambda->>Lambda: 1. 校验参数
    Lambda->>Lambda: 2. 生成 correlation_id
    Lambda-->>-LitClient: Response: { "correlation_id": "..." }
    Lambda->>+SQS: 发送任务消息 (含参数和 correlation_id)
    SQS-->>-Lambda: 确认消息接收

    Note right of CeleryWorker: Worker 持续监听 SQS 队列
    CeleryWorker->>+SQS: 拉取任务消息
    SQS-->>-CeleryWorker: 返回消息
    CeleryWorker->>CeleryWorker: 执行耗时任务...

    Note left of LitClient: 前端使用 correlation_id
订阅特定 MQTT 主题 LitClient->>+IoTCore: Subscribe to 'tasks/results/{correlation_id}' CeleryWorker->>+IoTCore: Publish to 'tasks/results/{correlation_id}'
(含任务结果或错误) IoTCore-->>-CeleryWorker: 确认消息发布 IoTCore-->>-LitClient: 推送任务结果消息 LitClient->>LitClient: 更新UI状态 (成功/失败/进度)

这个流程的核心在于 correlation_id。它由 Lambda 在任务创建时生成,并同时返回给前端和传递给 Celery Worker。它就像一张票根,让前端能够精确订阅只属于自己的那条结果通知,避免了广播风暴。

第一步:配置 Celery 与任务执行器

我们的 Celery Worker 需要能够从 SQS 队列中消费任务,并且在任务完成后有权限向 IoT Core 发布消息。

项目结构

celery_worker/
├── tasks.py         # Celery 任务定义
├── celery_app.py    # Celery 应用实例与配置
├── requirements.txt # Python 依赖
└── Dockerfile       # Worker 容器化配置

配置 Celery 应用 (celery_app.py)

在真实项目中,配置应该从环境变量加载,这里为了清晰起见直接硬编码。我们将使用 SQS 作为 Broker,但不配置 Result Backend,因为结果将通过 MQTT 主动推送。

# celery_app.py
import os
from celery import Celery
from kombu.utils.url import quote

# AWS 凭证和区域需要通过环境变量或 IAM 角色提供给 Celery Worker
# os.environ['AWS_ACCESS_KEY_ID'] = 'YOUR_KEY'
# os.environ['AWS_SECRET_ACCESS_KEY'] = 'YOUR_SECRET'
# os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

# SQS 队列名称
CELERY_QUEUE_NAME = "MyCeleryTaskQueue"

# 使用 SQS 作为 broker,需要对 AWS 凭证中的特殊字符进行编码
# 如果使用 IAM Role,则 access_key 和 secret_key 部分可以留空
aws_access_key = quote(os.environ.get("AWS_ACCESS_KEY_ID", ""), safe="")
aws_secret_key = quote(os.environ.get("AWS_SECRET_ACCESS_KEY", ""), safe="")
broker_url = f"sqs://{aws_access_key}:{aws_secret_key}@"

app = Celery(
    "long_tasks",
    broker=broker_url,
    # 不使用 Celery 的 result backend,我们将自己实现结果通知
    backend=None, 
    include=["tasks"]
)

app.conf.update(
    # Broker 设置
    broker_transport_options={
        "region": os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
        # visibility_timeout 必须大于最长任务的执行时间
        "visibility_timeout": 3600, 
        "polling_interval": 1,
        # 使用 long polling,降低 SQS API 调用成本
        "wait_time_seconds": 10,
        "queue_name_prefix": "celery-",
    },
    # 明确指定默认队列
    task_default_queue=CELERY_QUEUE_NAME,
    # 任务设置
    task_acks_late=True, # 任务执行成功后再确认,防止 worker 崩溃导致任务丢失
    worker_prefetch_multiplier=1, # 每个 worker 一次只取一个任务,适合长任务
    task_serializer="json",
    accept_content=["json"],
)

这里的 task_acks_late=True 是生产环境中的关键配置。它确保只有当我们的任务代码成功执行完毕(没有抛出异常),消息才会从 SQS 队列中删除。如果 Worker 在处理过程中崩溃,消息会在 visibility_timeout 之后重新可见,被其他 Worker 领取执行。

定义任务 (tasks.py)

这个文件包含了实际的业务逻辑。任务函数接收 correlation_id,并在完成后使用 boto3 向 IoT Core 发布结果。

# tasks.py
import time
import json
import boto3
import logging
from celery.utils.log import get_task_logger
from celery_app import app

# 获取 Celery 的 logger
logger = get_task_logger(__name__)
logger.setLevel(logging.INFO)

# 初始化 boto3 客户端。在生产环境中,IAM 角色是更安全的选择。
# 这里假设运行环境已配置好 AWS 凭证和区域
IOT_DATA_CLIENT = boto3.client("iot-data")
# 你的 AWS IoT Core endpoint,可以在 AWS 控制台找到
IOT_ENDPOINT_URL = "a1b2c3d4e5f6g7-ats.iot.us-east-1.amazonaws.com"

# 结果发布的主题模板
RESULT_TOPIC_TEMPLATE = "tasks/results/{}"

def publish_result(correlation_id: str, payload: dict):
    """封装向 AWS IoT Core 发布消息的逻辑"""
    topic = RESULT_TOPIC_TEMPLATE.format(correlation_id)
    try:
        logger.info(f"Publishing result to topic: {topic}")
        IOT_DATA_CLIENT.publish(
            topic=topic,
            qos=1, # Quality of Service 1: 至少一次交付
            payload=json.dumps(payload)
        )
    except Exception as e:
        logger.error(f"Failed to publish result for {correlation_id} to IoT Core: {e}", exc_info=True)
        # 在真实项目中,这里可能需要重试逻辑或记录到死信队列
        raise

@app.task(bind=True)
def generate_complex_report(self, user_id: str, report_params: dict, correlation_id: str):
    """一个模拟的耗时任务"""
    task_id = self.request.id
    logger.info(f"Task {task_id} received for user {user_id} with correlation_id {correlation_id}.")

    try:
        # 模拟任务执行前的状态更新
        publish_result(correlation_id, {"status": "PROCESSING", "progress": 0})

        # 模拟长时间的计算过程
        logger.info("Step 1/3: Fetching data...")
        time.sleep(10)
        publish_result(correlation_id, {"status": "PROCESSING", "progress": 33, "message": "Data fetched."})

        logger.info("Step 2/3: Processing data...")
        time.sleep(15)
        
        # 模拟一个可能发生的业务错误
        if report_params.get("simulate_error"):
            raise ValueError("Invalid report parameter detected.")

        publish_result(correlation_id, {"status": "PROCESSING", "progress": 66, "message": "Processing complete."})
        
        logger.info("Step 3/3: Generating report file...")
        time.sleep(10)

        # 任务成功完成
        result_payload = {
            "status": "SUCCESS",
            "correlation_id": correlation_id,
            "data": {
                "report_url": f"s3://my-reports-bucket/{user_id}/{correlation_id}.pdf",
                "generated_at": time.time()
            }
        }
        publish_result(correlation_id, result_payload)
        logger.info(f"Task {task_id} completed successfully.")
        return result_payload

    except Exception as e:
        logger.error(f"Task {task_id} failed: {e}", exc_info=True)
        # 任务失败,发布错误通知
        error_payload = {
            "status": "FAILURE",
            "correlation_id": correlation_id,
            "error": {
                "type": type(e).__name__,
                "message": str(e)
            }
        }
        publish_result(correlation_id, error_payload)
        # 重新抛出异常,Celery 会将其标记为失败
        raise

这个任务不仅模拟了耗时操作,还实现了进度的分步通知和统一的异常处理。任何异常都会被捕获,并以固定的格式发布到 MQTT 主题,确保前端能收到明确的失败反馈。

第二步:创建 Serverless 触发器

我们使用 AWS SAM (Serverless Application Model) 来定义 Lambda 函数和 API Gateway。

项目结构

lambda_trigger/
├── app.py           # Lambda 函数代码
├── requirements.txt # Python 依赖 (celery, boto3)
└── template.yaml    # SAM 模板

SAM 模板 (template.yaml)

这个模板定义了所有需要的 AWS 资源:一个 API Gateway、一个 Lambda 函数以及它们所需的 IAM 权限。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  A serverless API to trigger long-running Celery tasks.

Parameters:
  CeleryQueueName:
    Type: String
    Default: MyCeleryTaskQueue

Resources:
  TaskTriggerFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: .
      Handler: app.lambda_handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Timeout: 30
      MemorySize: 128
      Environment:
        Variables:
          CELERY_QUEUE_NAME: !Ref CeleryQueueName
      Policies:
        # 授予 Lambda 向指定 SQS 队列发送消息的权限
        - SQSSenderPolicy:
            QueueName: !GetAtt CeleryTaskQueue.QueueName
      Events:
        CreateTask:
          Type: Api
          Properties:
            Path: /tasks
            Method: post
  
  # 定义 Celery 使用的 SQS 队列
  CeleryTaskQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: !Ref CeleryQueueName
      VisibilityTimeout: 3600 # 与 Celery 配置保持一致

Outputs:
  ApiEndpoint:
    Description: "API Gateway endpoint URL for Prod stage"
    Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/tasks"

Lambda 函数代码 (app.py)

这个函数非常轻量,它的唯一职责是接收请求,生成 correlation_id,然后将任务委托给 Celery。

# app.py
import json
import os
import uuid
from celery import Celery

# 从环境变量中获取队列名称
CELERY_QUEUE_NAME = os.environ.get("CELERY_QUEUE_NAME")

# 创建一个临时的 Celery app 实例,仅用于发送任务
# 注意:这里的 broker 配置与 worker 处的配置逻辑需要保持一致
celery_app = Celery(
    broker=f"sqs://", # 使用 IAM role 认证时,URL 中无需凭证
    backend=None,
)
celery_app.conf.update(
    broker_transport_options={
        "region": os.environ.get("AWS_DEFAULT_REGION"),
        "queue_name_prefix": "celery-",
    }
)

def lambda_handler(event, context):
    try:
        body = json.loads(event.get('body', '{}'))
        user_id = event['requestContext']['authorizer']['claims']['sub'] # 假设使用 Cognito Authorizer
        report_params = body.get('report_params')

        if not all([user_id, report_params]):
            return {
                "statusCode": 400,
                "body": json.dumps({"error": "Missing user_id or report_params"}),
            }

        # 生成唯一的关联 ID
        correlation_id = str(uuid.uuid4())

        # 发送任务到 Celery 队列
        # 'tasks.generate_complex_report' 对应 tasks.py 中的任务函数
        celery_app.send_task(
            'tasks.generate_complex_report',
            args=[user_id, report_params, correlation_id],
            queue=CELERY_QUEUE_NAME
        )
        
        # 立即返回 correlation_id 给前端
        return {
            "statusCode": 202, # 202 Accepted 表示请求已被接受,将异步处理
            "headers": {
                "Content-Type": "application/json"
            },
            "body": json.dumps({
                "message": "Task accepted for processing.",
                "correlation_id": correlation_id
            }),
        }

    except json.JSONDecodeError:
        return {"statusCode": 400, "body": json.dumps({"error": "Invalid JSON in request body"})}
    except Exception as e:
        # 记录关键错误
        print(f"Error submitting task: {e}")
        return {
            "statusCode": 500,
            "body": json.dumps({"error": "Internal server error while queueing the task."}),
        }

返回 202 Accepted 状态码是 RESTful API 设计中处理异步操作的最佳实践。

第三步:前端 Lit 组件实现

最后,我们来构建前端。Lit 的响应式系统能让我们非常优雅地处理来自 MQTT 的异步消息。

依赖安装

我们需要一个 MQTT 客户端库。mqtt 是一个流行的选择。
npm install lit mqtt @aws-sdk/credential-provider-from-cognito-identity @aws-sdk/client-iot

创建 task-manager.ts 组件

这个组件封装了所有逻辑:调用 API、连接 MQTT、处理消息和渲染 UI。

// task-manager.ts
import { LitElement, html, css } from 'lit';
import { customElement, state } from 'lit/decorators.js';
import * as mqtt from 'mqtt';
import { fromCognitoIdentityPool } from "@aws-sdk/credential-provider-from-cognito-identity";
import { IoTClient, AttachPolicyCommand } from "@aws-sdk/client-iot";

// AWS Cognito 和 IoT 配置 - 在真实应用中应从配置文件或服务中获取
const AWS_REGION = 'us-east-1';
const IOT_ENDPOINT = 'a1b2c3d4e5f6g7-ats.iot.us-east-1.amazonaws.com';
const COGNITO_IDENTITY_POOL_ID = 'us-east-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx';
const IOT_POLICY_NAME = 'FrontendClientIoTPolicy'; // 预先创建好的 IoT Policy

@customElement('task-manager')
export class TaskManager extends LitElement {
  @state()
  private taskStatus: string = 'IDLE';

  @state()
  private taskResult: any = null;

  @state()
  private correlationId: string | null = null;
  
  @state()
  private errorMessage: string | null = null;

  @state()
  private progress: number = 0;

  private mqttClient: mqtt.MqttClient | null = null;

  static styles = css`
    /* ... 一些样式 ... */
    .progress-bar { width: 100%; background-color: #f1f1f1; }
    .progress { height: 24px; background-color: #4CAF50; text-align: center; color: white; }
  `;

  async triggerTask() {
    this.resetState();
    this.taskStatus = 'SUBMITTING';
    
    try {
      // 假设已通过 Cognito 获取了 idToken
      const idToken = '...'; // 从你的认证流程中获取
      const response = await fetch('YOUR_API_GATEWAY_ENDPOINT/tasks', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${idToken}`
        },
        body: JSON.stringify({ report_params: { format: "pdf" } }),
      });

      if (response.status !== 202) {
        throw new Error(`Server responded with ${response.status}`);
      }
      
      const data = await response.json();
      this.correlationId = data.correlation_id;
      this.taskStatus = 'PENDING';
      
      // 连接到 MQTT Broker
      this.connectToMqtt();

    } catch (error) {
      this.taskStatus = 'FAILED';
      this.errorMessage = error instanceof Error ? error.message : 'Unknown error';
    }
  }

  async connectToMqtt() {
    if (!this.correlationId) return;

    // 1. 获取临时 AWS 凭证
    const credentials = await fromCognitoIdentityPool({
      clientConfig: { region: AWS_REGION },
      identityPoolId: COGNITO_IDENTITY_POOL_ID,
    })();
    
    // 2. 为当前身份附加策略 (可选但更安全)
    // 这一步确保用户只能访问自己的资源,通常在后端完成
    // const iotClient = new IoTClient({ region: AWS_REGION, credentials });
    // await iotClient.send(new AttachPolicyCommand({
    //   policyName: IOT_POLICY_NAME,
    //   target: credentials.identityId,
    // }));

    // 3. 构造 WebSocket URL for MQTT
    const url = `wss://${IOT_ENDPOINT}/mqtt?X-Amz-Security-Token=${encodeURIComponent(credentials.sessionToken)}`;
    
    // 4. 连接
    this.mqttClient = mqtt.connect(url, {
      clientId: `frontend-${this.correlationId}-${Date.now()}`,
      accessKeyId: credentials.accessKeyId,
      secretAccessKey: credentials.secretAccessKey,
      protocol: 'wss',
    });

    this.mqttClient.on('connect', () => {
      console.log('Connected to AWS IoT Core');
      const topic = `tasks/results/${this.correlationId}`;
      this.mqttClient?.subscribe(topic, { qos: 1 }, (err) => {
        if (err) {
          console.error('Subscription error:', err);
          this.taskStatus = 'FAILED';
          this.errorMessage = 'Failed to subscribe to task updates.';
        } else {
          console.log(`Subscribed to ${topic}`);
        }
      });
    });

    this.mqttClient.on('message', (topic, message) => {
      const payload = JSON.parse(message.toString());
      console.log('Received message:', payload);
      this.handleTaskUpdate(payload);
    });

    this.mqttClient.on('error', (error) => {
      console.error('MQTT Connection Error:', error);
      this.taskStatus = 'FAILED';
      this.errorMessage = 'Connection to update service lost.';
    });
  }

  handleTaskUpdate(payload: any) {
    this.taskStatus = payload.status;
    if (payload.status === 'PROCESSING') {
        this.progress = payload.progress || this.progress;
    } else if (payload.status === 'SUCCESS') {
        this.taskResult = payload.data;
        this.progress = 100;
        this.disconnectMqtt();
    } else if (payload.status === 'FAILURE') {
        this.errorMessage = payload.error.message;
        this.disconnectMqtt();
    }
  }
  
  disconnectMqtt() {
      this.mqttClient?.end();
      this.mqttClient = null;
  }
  
  resetState() {
    this.taskStatus = 'IDLE';
    this.taskResult = null;
    this.correlationId = null;
    this.errorMessage = null;
    this.progress = 0;
    this.disconnectMqtt();
  }
  
  disconnectedCallback() {
    super.disconnectedCallback();
    this.disconnectMqtt(); // 组件销毁时断开连接
  }

  render() {
    return html`
      <div>
        <button @click=${this.triggerTask} .disabled=${this.taskStatus !== 'IDLE' && this.taskStatus !== 'FAILED'}>
          Generate Report
        </button>
        
        <div id="status-display">
          <p>Correlation ID: ${this.correlationId || 'N/A'}</p>
          <p>Status: <strong>${this.taskStatus}</strong></p>
          
          ${this.taskStatus === 'PROCESSING' ? html`
            <div class="progress-bar">
              <div class="progress" style="width: ${this.progress}%">${this.progress}%</div>
            </div>
          ` : ''}

          ${this.taskStatus === 'SUCCESS' ? html`
            <div>
              <h3>Report Ready!</h3>
              <pre>${JSON.stringify(this.taskResult, null, 2)}</pre>
            </div>
          ` : ''}

          ${this.taskStatus === 'FAILED' ? html`
            <div>
              <h3>Task Failed!</h3>
              <p>Reason: ${this.errorMessage}</p>
            </div>
          ` : ''}
        </div>
      </div>
    `;
  }
}

这个 Lit 组件通过 @state 装饰器实现了响应式。任何对 taskStatus, taskResult 等属性的修改都会自动触发组件的重新渲染,UI 的更新因此变得非常简单和声明式。其中,通过 Cognito Identity Pool 获取临时 AWS 凭证来安全连接 AWS IoT Core 是生产级的关键实践。

当前方案的局限性与未来展望

这个架构虽然优雅地解决了异步长任务的挑战,但也引入了新的复杂性。

首先,可观测性变得更具挑战。一个请求的完整生命周期横跨了 API Gateway、Lambda、SQS、EC2/ECS 和 IoT Core。若要进行端到端的链路追踪,必须引入像 AWS X-Ray 或 OpenTelemetry 这样的工具,并确保 correlation_id (或 trace id) 在所有组件间正确传递。

其次,成本考量。Celery Worker 运行在 EC2/ECS 上,如果任务量波动极大,固定的实例数仍然会造成资源浪费。一个优化方向是使用 KEDA (Kubernetes-based Event Driven Autoscaling) 或基于 SQS 队列深度的自定义伸缩策略,来动态调整 Celery Worker 的数量,使其更具弹性。甚至可以探索在 Fargate Serverless 计算类型上运行 Celery Worker,进一步向 Serverless 靠拢。

第三,结果通知机制。AWS IoT Core 是一个功能强大且稳定的选择,但对于某些简单场景可能有些过重。可以评估 API Gateway WebSocket API 作为替代方案。它的集成可能更直接,但需要自己处理连接管理、授权和扩展性问题,而 IoT Core 则将这些都托管了。

最后,错误处理与重试。当前 Celery 任务失败后会通知前端,但没有自动重试。对于可恢复的错误,可以在 Celery 任务中实现指数退避重试逻辑。对于不可恢复的错误,需要配置死信队列(Dead-Letter Queue),将失败的任务信息存入其中,以便后续进行人工排查和干预。


  目录