在 Kong API 网关中通过 WebAssembly 插件实现 NumPy 向量化计算以加速 Apollo Client 的 GraphQL 响应


痛点:一个拖垮前端的 GraphQL 向量计算

我们的一个数据科学仪表盘项目遇到了性能瓶颈。前端使用 React 和 Apollo Client,通过 GraphQL 查询后端一个 Python 服务获取时序数据。这个 Python 服务底层依赖 NumPy 进行数据处理。问题出在一个特定的图表上,它需要对一个包含数万个点的原始信号向量进行滑动平均(Moving Average)计算。

最初,我们将这个计算放在了前端。GraphQL 查询返回原始的 Float 数组,前端拿到数据后在 JavaScript 中循环计算。当数据点超过 10,000 个时,浏览器 UI 会出现肉眼可见的卡顿,有时甚至会假死几秒钟。这是一个典型的计算密集型任务阻塞了浏览器主线程的案例。

// 前端计算的初步实现,性能极差
function movingAverage(data, windowSize) {
  if (!data || data.length < windowSize) {
    return [];
  }
  const result = [];
  for (let i = 0; i <= data.length - windowSize; i++) {
    let sum = 0;
    for (let j = 0; j < windowSize; j++) {
      sum += data[i + j];
    }
    result.push(sum / windowSize);
  }
  return result;
}

// 在 React 组件中
const { data, loading, error } = useQuery(GET_RAW_SIGNAL, {
  variables: { sensorId: "sensor-001" },
});

// 每次重渲染或数据获取后都会进行昂贵的计算
const processedData = useMemo(
  () => movingAverage(data?.rawSignal.values, 50),
  [data]
);

这个方案很快被否决。第二个想法是将计算后移到后端。修改 GraphQL Schema,增加一个 movingAverageSignal 字段,并在 Python 的 Resolver 中调用 NumPy 实现。

# 后端 FastAPI + Strawberry (GraphQL)
import numpy as np
import strawberry

@strawberry.type
class Signal:
    values: list[float]

def calculate_moving_average(data: list[float], window_size: int) -> list[float]:
    if not data or len(data) < window_size:
        return []
    # NumPy 的实现非常高效
    return np.convolve(np.array(data), np.ones(window_size), 'valid') / window_size

@strawberry.type
class Query:
    @strawberry.field
    def get_signal(self, sensor_id: str, window_size: int) -> Signal:
        raw_data = fetch_raw_data_from_db(sensor_id) # 获取原始数据
        processed_values = calculate_moving_average(raw_data, window_size)
        return Signal(values=processed_values.tolist())

这个方案在功能上是可行的,NumPy 的性能也足够好。但它引入了一个新的架构问题。这个滑动平均计算是一个纯粹的、无状态的函数。把它强耦合在核心数据服务的业务逻辑中,我们遇到了一些麻烦:

  1. Resolver 膨胀: 未来可能会有傅里叶变换、滤波、降采样等更多计算需求,把它们全部加到 Resolver 中会让代码变得臃肿不堪。
  2. 资源占用: Python 因为 GIL 的存在,处理 CPU 密集型任务并不完美。当并发请求增多时,这些计算会显著增加 Gunicorn worker 的 CPU 负载,影响整个服务的吞吐量。
  3. 缓存失效: 原始信号数据是可以被有效缓存的,但增加了 window_size 这样的计算参数后,GraphQL 的缓存变得复杂,缓存命中率急剧下降。

我们需要一个方案,既能避免在客户端进行重计算,又不会污染核心后端服务。计算任务应该被剥离出来。第一反应是再做一个“计算微服务”,但这会引入额外的网络开销和运维成本。

最终,我们将目光投向了我们的 API 网关——Kong。如果这个无状态的计算可以发生在网关层呢?请求到达 Kong,Kong 转发给后端获取原始数据,在响应返回给客户端的途中,Kong 动态地对数据进行处理。这听起来很完美。Kong 的插件机制是实现这一点的关键,而对于计算密集型任务,使用 Lua 编写插件性能可能不足。幸运的是,Kong 支持 WebAssembly (WASM),这让我们能够使用 Rust 或 C++ 这类高性能语言来编写插件。

我们的最终架构决策是:在 Kong 中使用一个基于 Rust 和 WASM 的自定义插件,拦截来自 NumPy 后端的 GraphQL 响应,执行向量计算,并用计算结果替换原始数据,最后再返回给 Apollo Client。

架构与流程设计

这个方案的核心在于请求-响应流的改造。

sequenceDiagram
    participant Client as Apollo Client
    participant Gateway as Kong API Gateway
    participant WASM as WASM Plugin (Rust)
    participant Backend as Python/NumPy Service

    Client->>Gateway: GraphQL Query (for rawSignal)
    Gateway->>Backend: Forward Request
    Backend-->>Gateway: GraphQL Response (with rawSignal)
    Gateway->>WASM: on_http_response_body
    Note right of WASM: 1. Parse JSON response body
2. Extract rawSignal.values array
3. Perform moving average calculation
4. Create new JSON with processed data WASM-->>Gateway: Modified Response Body Gateway-->>Client: GraphQL Response (with processedSignal)

这个流程的优势在于:

  • 关注点分离: 后端服务只负责提供纯粹的、可缓存的原始数据。前端应用也只负责消费最终数据。计算逻辑被封装在基础设施层。
  • 高性能: Rust 编译为 WASM 后执行效率接近原生代码,对于数值计算这类任务远超 Lua 或 JavaScript。
  • 透明性: 对于客户端和服务端,这个转换过程是完全透明的。它们都不知道中间发生了计算。

动手实现:从 Rust 插件到 Kong 配置

1. 后端 Python 服务准备

首先,我们确保后端服务只返回原始数据。这让服务逻辑变得极其简单和健壮。

# app.py
import strawberry
from fastapi import FastAPI
from strawberry.fastapi import GraphQLRouter
import numpy as np
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 模拟一个数据库或数据源
def fetch_raw_data_from_db(sensor_id: str) -> list[float]:
    logger.info(f"Fetching raw data for sensor: {sensor_id}")
    # 生成一个包含噪声的正弦波作为原始数据
    np.random.seed(42)
    time_steps = np.linspace(0, 10, 20000)
    signal = np.sin(2 * np.pi * 1.5 * time_steps)
    noise = np.random.normal(0, 0.3, len(time_steps))
    return (signal + noise).tolist()

@strawberry.type
class RawSignal:
    sensor_id: str
    values: list[float]

@strawberry.type
class Query:
    @strawberry.field
    def rawSignal(self, sensor_id: str) -> RawSignal:
        values = fetch_raw_data_from_db(sensor_id)
        return RawSignal(sensor_id=sensor_id, values=values)

schema = strawberry.Schema(query=Query)
graphql_app = GraphQLRouter(schema)

app = FastAPI()
app.include_router(graphql_app, prefix="/graphql")

@app.get("/health")
def health_check():
    return {"status": "ok"}

这个服务非常纯粹,它的 rawSignal 解析器只负责从数据源获取数据并返回。

2. 核心:构建 Kong WASM 插件 (Rust)

这是整个方案的技术核心。我们需要使用 proxy-wasm Rust SDK 来与 Kong 的代理生命周期挂钩。

项目初始化:

# 安装 wasm32-wasi target
rustup target add wasm32-wasi

# 创建一个新的 Rust 库项目
cargo new --lib kong-wasm-calculator
cd kong-wasm-calculator

添加依赖到 Cargo.toml:

[package]
name = "kong-wasm-calculator"
version = "0.1.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

[dependencies]
proxy-wasm = "0.2.1"
serde_json = "1.0"
log = "0.4"

编写插件代码 (src/lib.rs):

我们将实现 on_http_response_body 这个生命周期函数。它会在 Kong 收到来自上游服务(我们的 Python 后端)的响应体时被调用。

// src/lib.rs
use proxy_wasm::traits::{Context, HttpContext};
use proxy_wasm::types::{Action, LogLevel};
use serde_json::{Value, json};
use log::info;

// --- 插件的根上下文 ---
struct KongWasmCalculator;

#[no_mangle]
fn _start() {
    proxy_wasm::set_log_level(LogLevel::Info);
    proxy_wasm::set_http_context(|| Box::new(HttpHandler::new()));
}

impl Context for KongWasmCalculator {}

// --- HTTP 请求处理上下文 ---
struct HttpHandler {
    window_size: usize,
}

impl HttpHandler {
    fn new() -> Self {
        HttpHandler { window_size: 50 } // 默认窗口大小,可以从配置中读取
    }
}

impl Context for HttpHandler {}

impl HttpContext for HttpHandler {
    fn on_http_request_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action {
        // 尝试从请求头中获取窗口大小,提供动态配置能力
        if let Some(window_str) = self.get_http_request_header("x-window-size") {
            if let Ok(size) = window_str.parse::<usize>() {
                if size > 0 {
                    self.window_size = size;
                    info!("Using custom window size from header: {}", size);
                }
            }
        }
        Action::Continue
    }

    fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
        // 我们只处理完整的响应体
        if !end_of_stream {
            return Action::Pause;
        }

        // 获取整个响应体
        if let Some(body_bytes) = self.get_http_response_body(0, body_size) {
            // 解析 JSON
            let mut v: Value = match serde_json::from_slice(&body_bytes) {
                Ok(val) => val,
                Err(e) => {
                    log::warn!("Failed to parse response body as JSON: {}", e);
                    return Action::Continue;
                }
            };
            
            // 定位到 `data.rawSignal.values`
            if let Some(values_array) = v.pointer_mut("/data/rawSignal/values") {
                if let Some(arr) = values_array.as_array() {
                    
                    // 将 JSON 数字数组转换为 Rust 的 Vec<f64>
                    // 在生产环境中,需要更健壮的错误处理
                    let numbers: Vec<f64> = arr.iter()
                        .filter_map(|n| n.as_f64())
                        .collect();

                    info!("Successfully extracted {} data points for processing.", numbers.len());

                    // --- 核心计算逻辑 ---
                    let processed_values = self.calculate_moving_average(&numbers);

                    // --- 修改 JSON 结构 ---
                    // 1. 移除旧的 `rawSignal` 字段
                    if let Some(data_obj) = v.pointer_mut("/data").and_then(|d| d.as_object_mut()) {
                        data_obj.remove("rawSignal");
                        
                        // 2. 添加新的 `processedSignal` 字段
                        data_obj.insert(
                            "processedSignal".to_string(),
                            json!({
                                "__typename": "ProcessedSignal",
                                "windowSize": self.window_size,
                                "values": processed_values
                            })
                        );
                    }
                    
                    // 序列化修改后的 JSON 并替换响应体
                    match serde_json::to_vec(&v) {
                        Ok(new_body) => {
                            self.set_http_response_body(0, new_body.len(), &new_body);
                            // 更新 Content-Length 头,这在真实项目中至关重要
                            self.set_http_response_header("Content-Length", &new_body.len().to_string());
                        }
                        Err(e) => {
                            log::error!("Failed to serialize modified JSON: {}", e);
                        }
                    }
                }
            }
        }

        Action::Continue
    }
}

// 分离出的计算函数,便于单元测试
impl HttpHandler {
    fn calculate_moving_average(&self, data: &[f64]) -> Vec<f64> {
        if data.is_empty() || self.window_size == 0 || data.len() < self.window_size {
            return Vec::new();
        }
        
        let mut result = Vec::with_capacity(data.len() - self.window_size + 1);
        let mut sum: f64 = data.iter().take(self.window_size).sum();
        result.push(sum / self.window_size as f64);

        for i in 0..(data.len() - self.window_size) {
            sum = sum - data[i] + data[i + self.window_size];
            result.push(sum / self.window_size as f64);
        }
        result
    }
}

这段 Rust 代码做了几件关键的事情:

  1. 挂载生命周期: 通过 set_http_contexton_http_response_body 将逻辑注入到响应处理流程中。
  2. 动态配置: 从请求头 x-window-size 读取参数,使得插件更加灵活。
  3. JSON 处理: 使用 serde_json 高效地解析和修改响应体。这是一个常见陷阱,直接操作字符串会非常脆弱。我们通过 pointer_mut 精准定位到需要修改的 JSON 字段。
  4. 核心计算: 实现了一个高效的滑动窗口算法。在真实项目中,这里可以使用 ndarray 等更专业的科学计算库。
  5. 响应重构: 它不仅仅是替换了 values 数组,而是整个重构了 GraphQL 的返回字段,从 rawSignal 变成了 processedSignal,这对于客户端类型安全(如 Apollo Client 的缓存)至关重要。

编译为 WASM:

cargo build --target wasm32-wasi --release

编译成功后,target/wasm32-wasi/release/kong_wasm_calculator.wasm 就是我们需要的插件文件。

3. Kong 的配置与部署

我们使用 Docker Compose 来启动 Kong 和我们的 Python 后端服务,并采用声明式的 kong.yaml 来配置所有资源。

docker-compose.yml:

version: '3.8'
services:
  kong:
    image: kong:3.4
    container_name: kong-gateway
    environment:
      KONG_DATABASE: 'off'
      KONG_DECLARATIVE_CONFIG: /etc/kong/kong.yaml
      KONG_PROXY_ACCESS_LOG: /dev/stdout
      KONG_ADMIN_ACCESS_LOG: /dev/stdout
      KONG_PROXY_ERROR_LOG: /dev/stderr
      KONG_ADMIN_ERROR_LOG: /dev/stderr
      KONG_ADMIN_LISTEN: 0.0.0.0:8001
      KONG_PROXY_LISTEN: 0.0.0.0:8000
      # 启用 wasm 插件并配置 wasmtime 作为执行引擎
      KONG_PLUGINS: bundled,proxy-wasm
      KONG_WASM: on
      KONG_WASM_FILTERS_PATH: /usr/local/kong/wasm
    ports:
      - "8000:8000"
      - "8001:8001"
    volumes:
      - ./kong.yaml:/etc/kong/kong.yaml:ro
      # 将编译好的 wasm 文件挂载到 Kong 容器内
      - ./kong-wasm-calculator/target/wasm32-wasi/release/kong_wasm_calculator.wasm:/usr/local/kong/wasm/calculator.wasm:ro
    depends_on:
      - backend
    networks:
      - kong-net

  backend:
    container_name: python-backend
    build:
      context: ./backend # 指向你的 FastAPI 项目目录
    command: uvicorn app:app --host 0.0.0.0 --port 5000
    networks:
      - kong-net

networks:
  kong-net:
    driver: bridge

kong.yaml:

这是将所有部分粘合在一起的声明式配置。

_format_version: "3.0"

services:
  - name: data-service
    # 使用 Docker DNS 来解析后端服务
    url: http://backend:5000
    routes:
      - name: data-route
        paths:
          - /graphql

    plugins:
      - name: proxy-wasm
        config:
          #  filter_name 必须与 Rust 代码中的 `_start` 函数相关联
          # 通常是 crate 的名字
          filter_name: "kong_wasm_calculator"
          # 指向我们挂载的 wasm 文件路径
          vm_config:
            runtime: "wasmtime"
            path: "/usr/local/kong/wasm/calculator.wasm"

现在,启动整个栈:

# 构建后端服务镜像
docker-compose build backend
# 启动所有服务
docker-compose up

4. 前端 Apollo Client 适配

前端的修改非常简单。首先,GraphQL 查询语句需要改变,以请求新的 processedSignal 字段。同时,客户端的计算逻辑被完全移除。

新的 GraphQL 查询:

# New query for processed data
query GetProcessedSignal($sensorId: String!, $windowSize: Int!) {
  processedSignal(sensorId: $sensorId, windowSize: $windowSize) {
    windowSize
    values
  }
}

# The frontend thinks it's querying a field that the backend natively supports.
# But it's actually the original rawSignal query transformed by Kong.

Apollo Client 调用:

实际上,我们不需要修改查询,只需要调整期望的返回类型。Apollo Client 依然发送获取 rawSignal 的请求,但它会收到包含 processedSignal 的响应。为了类型安全,我们需要让 Apollo Client 知道这种可能性,或者直接修改前端代码以匹配 Kong 插件转换后的数据结构。

一个更优雅的方式是,前端依然请求 rawSignal,但 Kong 插件不改变字段名,只替换 values 的内容,并可能添加一个 processingInfo 字段。为了演示字段重构的能力,我们保持了当前的设计。

前端 React 组件现在变得非常干净:

import { gql, useQuery } from '@apollo/client';
import { useMemo }from 'react';

// 注意:这里的查询是发往 Kong 的
// Kong 会将其转发到后端,后端只认识 rawSignal
// 但返回给客户端时,已经被 WASM 插件修改
const GET_SIGNAL_DATA = gql`
  query GetRawSignal($sensorId: String!) {
    rawSignal(sensorId: $sensorId) {
      sensorId
      values
    }
  }
`;

function SignalChart({ sensorId }) {
  const { data, loading, error } = useQuery(GET_SIGNAL_DATA, {
    variables: { sensorId },
    // 通过 context 将 x-window-size 头传给 Kong
    context: {
        headers: {
            'x-window-size': 50,
        }
    }
  });

  if (loading) return <p>Loading...</p>;
  if (error) return <p>Error: {error.message}</p>;

  // data 的结构是 { "data": { "processedSignal": { ... } } }
  // Apollo Client 在这里可能会因为类型不匹配而报警,
  // 生产环境需要通过 typePolicies 来处理
  const chartData = data?.processedSignal?.values || [];

  // UI 渲染逻辑...
  return (
    <div>
      <h2>Processed Signal (Window: {data?.processedSignal?.windowSize})</h2>
      {/* Charting component here using chartData */}
      <p>Data points: {chartData.length}</p>
    </div>
  );
}

测试一下效果:

curl -i -X POST \
  -H "Content-Type: application/json" \
  -H "x-window-size: 100" \
  -d '{"query":"query { rawSignal(sensorId: \"sensor-001\") { values } }"}' \
  http://localhost:8000/graphql

返回的 HTTP 响应体将是:

{
  "data": {
    "processedSignal": {
      "__typename": "ProcessedSignal",
      "values": [
        /* ... 大量计算后的浮点数 ... */
      ],
      "windowSize": 100
    }
  }
}

后端服务日志只会显示它被请求了 rawSignal,而前端则直接收到了计算好的数据,性能问题迎刃而解。

遗留问题与未来迭代

这个方案虽然优雅地解决了当前的性能瓶颈,但也引入了新的复杂性,在真实项目中需要考虑更多。

  1. 可观测性与调试: WASM 插件的调试是一个挑战。日志是我们目前主要的观测手段。在生产环境中,需要将 WASM 模块的日志、指标(如执行耗时)和 Tracing 集成到统一的可观测性平台中。
  2. 插件的生命周期管理: 如何对 WASM 插件进行版本控制、灰度发布和回滚?这需要一套围绕 API 网关的 CI/CD 流程。简单地通过挂载 volume 替换文件的方式在多节点集群中是不可靠的。
  3. 计算逻辑的通用性: 当前插件的计算逻辑是硬编码的。一个更高级的实现可能是,插件能够解析 GraphQL 查询中的特定指令(例如 @compute(type: "movingAverage", window: 50)),从而实现一个通用的、由客户端驱动的计算网关层。
  4. 安全边界: 在网关层执行任意代码需要非常谨慎。WASM 的沙箱机制提供了很好的安全保障,但插件本身的代码质量、资源消耗(内存/CPU)都需要被严格审查和限制,以防止恶意请求通过复杂的计算耗尽网关资源。

尽管存在这些挑战,将无状态、计算密集的任务从核心服务下沉到 API 网关的 WASM 层,为解决特定类型的性能问题提供了一个强有力的架构模式。它清晰地划分了数据提供与数据处理的界限,让系统的每一部分都更专注于自己的核心职责。


  目录