痛点:一个拖垮前端的 GraphQL 向量计算
我们的一个数据科学仪表盘项目遇到了性能瓶颈。前端使用 React 和 Apollo Client,通过 GraphQL 查询后端一个 Python 服务获取时序数据。这个 Python 服务底层依赖 NumPy 进行数据处理。问题出在一个特定的图表上,它需要对一个包含数万个点的原始信号向量进行滑动平均(Moving Average)计算。
最初,我们将这个计算放在了前端。GraphQL 查询返回原始的 Float 数组,前端拿到数据后在 JavaScript 中循环计算。当数据点超过 10,000 个时,浏览器 UI 会出现肉眼可见的卡顿,有时甚至会假死几秒钟。这是一个典型的计算密集型任务阻塞了浏览器主线程的案例。
// 前端计算的初步实现,性能极差
function movingAverage(data, windowSize) {
if (!data || data.length < windowSize) {
return [];
}
const result = [];
for (let i = 0; i <= data.length - windowSize; i++) {
let sum = 0;
for (let j = 0; j < windowSize; j++) {
sum += data[i + j];
}
result.push(sum / windowSize);
}
return result;
}
// 在 React 组件中
const { data, loading, error } = useQuery(GET_RAW_SIGNAL, {
variables: { sensorId: "sensor-001" },
});
// 每次重渲染或数据获取后都会进行昂贵的计算
const processedData = useMemo(
() => movingAverage(data?.rawSignal.values, 50),
[data]
);
这个方案很快被否决。第二个想法是将计算后移到后端。修改 GraphQL Schema,增加一个 movingAverageSignal 字段,并在 Python 的 Resolver 中调用 NumPy 实现。
# 后端 FastAPI + Strawberry (GraphQL)
import numpy as np
import strawberry
@strawberry.type
class Signal:
values: list[float]
def calculate_moving_average(data: list[float], window_size: int) -> list[float]:
if not data or len(data) < window_size:
return []
# NumPy 的实现非常高效
return np.convolve(np.array(data), np.ones(window_size), 'valid') / window_size
@strawberry.type
class Query:
@strawberry.field
def get_signal(self, sensor_id: str, window_size: int) -> Signal:
raw_data = fetch_raw_data_from_db(sensor_id) # 获取原始数据
processed_values = calculate_moving_average(raw_data, window_size)
return Signal(values=processed_values.tolist())
这个方案在功能上是可行的,NumPy 的性能也足够好。但它引入了一个新的架构问题。这个滑动平均计算是一个纯粹的、无状态的函数。把它强耦合在核心数据服务的业务逻辑中,我们遇到了一些麻烦:
- Resolver 膨胀: 未来可能会有傅里叶变换、滤波、降采样等更多计算需求,把它们全部加到 Resolver 中会让代码变得臃肿不堪。
- 资源占用: Python 因为 GIL 的存在,处理 CPU 密集型任务并不完美。当并发请求增多时,这些计算会显著增加 Gunicorn worker 的 CPU 负载,影响整个服务的吞吐量。
- 缓存失效: 原始信号数据是可以被有效缓存的,但增加了
window_size这样的计算参数后,GraphQL 的缓存变得复杂,缓存命中率急剧下降。
我们需要一个方案,既能避免在客户端进行重计算,又不会污染核心后端服务。计算任务应该被剥离出来。第一反应是再做一个“计算微服务”,但这会引入额外的网络开销和运维成本。
最终,我们将目光投向了我们的 API 网关——Kong。如果这个无状态的计算可以发生在网关层呢?请求到达 Kong,Kong 转发给后端获取原始数据,在响应返回给客户端的途中,Kong 动态地对数据进行处理。这听起来很完美。Kong 的插件机制是实现这一点的关键,而对于计算密集型任务,使用 Lua 编写插件性能可能不足。幸运的是,Kong 支持 WebAssembly (WASM),这让我们能够使用 Rust 或 C++ 这类高性能语言来编写插件。
我们的最终架构决策是:在 Kong 中使用一个基于 Rust 和 WASM 的自定义插件,拦截来自 NumPy 后端的 GraphQL 响应,执行向量计算,并用计算结果替换原始数据,最后再返回给 Apollo Client。
架构与流程设计
这个方案的核心在于请求-响应流的改造。
sequenceDiagram
participant Client as Apollo Client
participant Gateway as Kong API Gateway
participant WASM as WASM Plugin (Rust)
participant Backend as Python/NumPy Service
Client->>Gateway: GraphQL Query (for rawSignal)
Gateway->>Backend: Forward Request
Backend-->>Gateway: GraphQL Response (with rawSignal)
Gateway->>WASM: on_http_response_body
Note right of WASM: 1. Parse JSON response body
2. Extract rawSignal.values array
3. Perform moving average calculation
4. Create new JSON with processed data
WASM-->>Gateway: Modified Response Body
Gateway-->>Client: GraphQL Response (with processedSignal)
这个流程的优势在于:
- 关注点分离: 后端服务只负责提供纯粹的、可缓存的原始数据。前端应用也只负责消费最终数据。计算逻辑被封装在基础设施层。
- 高性能: Rust 编译为 WASM 后执行效率接近原生代码,对于数值计算这类任务远超 Lua 或 JavaScript。
- 透明性: 对于客户端和服务端,这个转换过程是完全透明的。它们都不知道中间发生了计算。
动手实现:从 Rust 插件到 Kong 配置
1. 后端 Python 服务准备
首先,我们确保后端服务只返回原始数据。这让服务逻辑变得极其简单和健壮。
# app.py
import strawberry
from fastapi import FastAPI
from strawberry.fastapi import GraphQLRouter
import numpy as np
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 模拟一个数据库或数据源
def fetch_raw_data_from_db(sensor_id: str) -> list[float]:
logger.info(f"Fetching raw data for sensor: {sensor_id}")
# 生成一个包含噪声的正弦波作为原始数据
np.random.seed(42)
time_steps = np.linspace(0, 10, 20000)
signal = np.sin(2 * np.pi * 1.5 * time_steps)
noise = np.random.normal(0, 0.3, len(time_steps))
return (signal + noise).tolist()
@strawberry.type
class RawSignal:
sensor_id: str
values: list[float]
@strawberry.type
class Query:
@strawberry.field
def rawSignal(self, sensor_id: str) -> RawSignal:
values = fetch_raw_data_from_db(sensor_id)
return RawSignal(sensor_id=sensor_id, values=values)
schema = strawberry.Schema(query=Query)
graphql_app = GraphQLRouter(schema)
app = FastAPI()
app.include_router(graphql_app, prefix="/graphql")
@app.get("/health")
def health_check():
return {"status": "ok"}
这个服务非常纯粹,它的 rawSignal 解析器只负责从数据源获取数据并返回。
2. 核心:构建 Kong WASM 插件 (Rust)
这是整个方案的技术核心。我们需要使用 proxy-wasm Rust SDK 来与 Kong 的代理生命周期挂钩。
项目初始化:
# 安装 wasm32-wasi target
rustup target add wasm32-wasi
# 创建一个新的 Rust 库项目
cargo new --lib kong-wasm-calculator
cd kong-wasm-calculator
添加依赖到 Cargo.toml:
[package]
name = "kong-wasm-calculator"
version = "0.1.0"
edition = "2021"
[lib]
crate-type = ["cdylib"]
[dependencies]
proxy-wasm = "0.2.1"
serde_json = "1.0"
log = "0.4"
编写插件代码 (src/lib.rs):
我们将实现 on_http_response_body 这个生命周期函数。它会在 Kong 收到来自上游服务(我们的 Python 后端)的响应体时被调用。
// src/lib.rs
use proxy_wasm::traits::{Context, HttpContext};
use proxy_wasm::types::{Action, LogLevel};
use serde_json::{Value, json};
use log::info;
// --- 插件的根上下文 ---
struct KongWasmCalculator;
#[no_mangle]
fn _start() {
proxy_wasm::set_log_level(LogLevel::Info);
proxy_wasm::set_http_context(|| Box::new(HttpHandler::new()));
}
impl Context for KongWasmCalculator {}
// --- HTTP 请求处理上下文 ---
struct HttpHandler {
window_size: usize,
}
impl HttpHandler {
fn new() -> Self {
HttpHandler { window_size: 50 } // 默认窗口大小,可以从配置中读取
}
}
impl Context for HttpHandler {}
impl HttpContext for HttpHandler {
fn on_http_request_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action {
// 尝试从请求头中获取窗口大小,提供动态配置能力
if let Some(window_str) = self.get_http_request_header("x-window-size") {
if let Ok(size) = window_str.parse::<usize>() {
if size > 0 {
self.window_size = size;
info!("Using custom window size from header: {}", size);
}
}
}
Action::Continue
}
fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
// 我们只处理完整的响应体
if !end_of_stream {
return Action::Pause;
}
// 获取整个响应体
if let Some(body_bytes) = self.get_http_response_body(0, body_size) {
// 解析 JSON
let mut v: Value = match serde_json::from_slice(&body_bytes) {
Ok(val) => val,
Err(e) => {
log::warn!("Failed to parse response body as JSON: {}", e);
return Action::Continue;
}
};
// 定位到 `data.rawSignal.values`
if let Some(values_array) = v.pointer_mut("/data/rawSignal/values") {
if let Some(arr) = values_array.as_array() {
// 将 JSON 数字数组转换为 Rust 的 Vec<f64>
// 在生产环境中,需要更健壮的错误处理
let numbers: Vec<f64> = arr.iter()
.filter_map(|n| n.as_f64())
.collect();
info!("Successfully extracted {} data points for processing.", numbers.len());
// --- 核心计算逻辑 ---
let processed_values = self.calculate_moving_average(&numbers);
// --- 修改 JSON 结构 ---
// 1. 移除旧的 `rawSignal` 字段
if let Some(data_obj) = v.pointer_mut("/data").and_then(|d| d.as_object_mut()) {
data_obj.remove("rawSignal");
// 2. 添加新的 `processedSignal` 字段
data_obj.insert(
"processedSignal".to_string(),
json!({
"__typename": "ProcessedSignal",
"windowSize": self.window_size,
"values": processed_values
})
);
}
// 序列化修改后的 JSON 并替换响应体
match serde_json::to_vec(&v) {
Ok(new_body) => {
self.set_http_response_body(0, new_body.len(), &new_body);
// 更新 Content-Length 头,这在真实项目中至关重要
self.set_http_response_header("Content-Length", &new_body.len().to_string());
}
Err(e) => {
log::error!("Failed to serialize modified JSON: {}", e);
}
}
}
}
}
Action::Continue
}
}
// 分离出的计算函数,便于单元测试
impl HttpHandler {
fn calculate_moving_average(&self, data: &[f64]) -> Vec<f64> {
if data.is_empty() || self.window_size == 0 || data.len() < self.window_size {
return Vec::new();
}
let mut result = Vec::with_capacity(data.len() - self.window_size + 1);
let mut sum: f64 = data.iter().take(self.window_size).sum();
result.push(sum / self.window_size as f64);
for i in 0..(data.len() - self.window_size) {
sum = sum - data[i] + data[i + self.window_size];
result.push(sum / self.window_size as f64);
}
result
}
}
这段 Rust 代码做了几件关键的事情:
- 挂载生命周期: 通过
set_http_context和on_http_response_body将逻辑注入到响应处理流程中。 - 动态配置: 从请求头
x-window-size读取参数,使得插件更加灵活。 - JSON 处理: 使用
serde_json高效地解析和修改响应体。这是一个常见陷阱,直接操作字符串会非常脆弱。我们通过pointer_mut精准定位到需要修改的 JSON 字段。 - 核心计算: 实现了一个高效的滑动窗口算法。在真实项目中,这里可以使用
ndarray等更专业的科学计算库。 - 响应重构: 它不仅仅是替换了
values数组,而是整个重构了 GraphQL 的返回字段,从rawSignal变成了processedSignal,这对于客户端类型安全(如 Apollo Client 的缓存)至关重要。
编译为 WASM:
cargo build --target wasm32-wasi --release
编译成功后,target/wasm32-wasi/release/kong_wasm_calculator.wasm 就是我们需要的插件文件。
3. Kong 的配置与部署
我们使用 Docker Compose 来启动 Kong 和我们的 Python 后端服务,并采用声明式的 kong.yaml 来配置所有资源。
docker-compose.yml:
version: '3.8'
services:
kong:
image: kong:3.4
container_name: kong-gateway
environment:
KONG_DATABASE: 'off'
KONG_DECLARATIVE_CONFIG: /etc/kong/kong.yaml
KONG_PROXY_ACCESS_LOG: /dev/stdout
KONG_ADMIN_ACCESS_LOG: /dev/stdout
KONG_PROXY_ERROR_LOG: /dev/stderr
KONG_ADMIN_ERROR_LOG: /dev/stderr
KONG_ADMIN_LISTEN: 0.0.0.0:8001
KONG_PROXY_LISTEN: 0.0.0.0:8000
# 启用 wasm 插件并配置 wasmtime 作为执行引擎
KONG_PLUGINS: bundled,proxy-wasm
KONG_WASM: on
KONG_WASM_FILTERS_PATH: /usr/local/kong/wasm
ports:
- "8000:8000"
- "8001:8001"
volumes:
- ./kong.yaml:/etc/kong/kong.yaml:ro
# 将编译好的 wasm 文件挂载到 Kong 容器内
- ./kong-wasm-calculator/target/wasm32-wasi/release/kong_wasm_calculator.wasm:/usr/local/kong/wasm/calculator.wasm:ro
depends_on:
- backend
networks:
- kong-net
backend:
container_name: python-backend
build:
context: ./backend # 指向你的 FastAPI 项目目录
command: uvicorn app:app --host 0.0.0.0 --port 5000
networks:
- kong-net
networks:
kong-net:
driver: bridge
kong.yaml:
这是将所有部分粘合在一起的声明式配置。
_format_version: "3.0"
services:
- name: data-service
# 使用 Docker DNS 来解析后端服务
url: http://backend:5000
routes:
- name: data-route
paths:
- /graphql
plugins:
- name: proxy-wasm
config:
# filter_name 必须与 Rust 代码中的 `_start` 函数相关联
# 通常是 crate 的名字
filter_name: "kong_wasm_calculator"
# 指向我们挂载的 wasm 文件路径
vm_config:
runtime: "wasmtime"
path: "/usr/local/kong/wasm/calculator.wasm"
现在,启动整个栈:
# 构建后端服务镜像
docker-compose build backend
# 启动所有服务
docker-compose up
4. 前端 Apollo Client 适配
前端的修改非常简单。首先,GraphQL 查询语句需要改变,以请求新的 processedSignal 字段。同时,客户端的计算逻辑被完全移除。
新的 GraphQL 查询:
# New query for processed data
query GetProcessedSignal($sensorId: String!, $windowSize: Int!) {
processedSignal(sensorId: $sensorId, windowSize: $windowSize) {
windowSize
values
}
}
# The frontend thinks it's querying a field that the backend natively supports.
# But it's actually the original rawSignal query transformed by Kong.
Apollo Client 调用:
实际上,我们不需要修改查询,只需要调整期望的返回类型。Apollo Client 依然发送获取 rawSignal 的请求,但它会收到包含 processedSignal 的响应。为了类型安全,我们需要让 Apollo Client 知道这种可能性,或者直接修改前端代码以匹配 Kong 插件转换后的数据结构。
一个更优雅的方式是,前端依然请求 rawSignal,但 Kong 插件不改变字段名,只替换 values 的内容,并可能添加一个 processingInfo 字段。为了演示字段重构的能力,我们保持了当前的设计。
前端 React 组件现在变得非常干净:
import { gql, useQuery } from '@apollo/client';
import { useMemo }from 'react';
// 注意:这里的查询是发往 Kong 的
// Kong 会将其转发到后端,后端只认识 rawSignal
// 但返回给客户端时,已经被 WASM 插件修改
const GET_SIGNAL_DATA = gql`
query GetRawSignal($sensorId: String!) {
rawSignal(sensorId: $sensorId) {
sensorId
values
}
}
`;
function SignalChart({ sensorId }) {
const { data, loading, error } = useQuery(GET_SIGNAL_DATA, {
variables: { sensorId },
// 通过 context 将 x-window-size 头传给 Kong
context: {
headers: {
'x-window-size': 50,
}
}
});
if (loading) return <p>Loading...</p>;
if (error) return <p>Error: {error.message}</p>;
// data 的结构是 { "data": { "processedSignal": { ... } } }
// Apollo Client 在这里可能会因为类型不匹配而报警,
// 生产环境需要通过 typePolicies 来处理
const chartData = data?.processedSignal?.values || [];
// UI 渲染逻辑...
return (
<div>
<h2>Processed Signal (Window: {data?.processedSignal?.windowSize})</h2>
{/* Charting component here using chartData */}
<p>Data points: {chartData.length}</p>
</div>
);
}
测试一下效果:
curl -i -X POST \
-H "Content-Type: application/json" \
-H "x-window-size: 100" \
-d '{"query":"query { rawSignal(sensorId: \"sensor-001\") { values } }"}' \
http://localhost:8000/graphql
返回的 HTTP 响应体将是:
{
"data": {
"processedSignal": {
"__typename": "ProcessedSignal",
"values": [
/* ... 大量计算后的浮点数 ... */
],
"windowSize": 100
}
}
}
后端服务日志只会显示它被请求了 rawSignal,而前端则直接收到了计算好的数据,性能问题迎刃而解。
遗留问题与未来迭代
这个方案虽然优雅地解决了当前的性能瓶颈,但也引入了新的复杂性,在真实项目中需要考虑更多。
- 可观测性与调试: WASM 插件的调试是一个挑战。日志是我们目前主要的观测手段。在生产环境中,需要将 WASM 模块的日志、指标(如执行耗时)和 Tracing 集成到统一的可观测性平台中。
- 插件的生命周期管理: 如何对 WASM 插件进行版本控制、灰度发布和回滚?这需要一套围绕 API 网关的 CI/CD 流程。简单地通过挂载 volume 替换文件的方式在多节点集群中是不可靠的。
- 计算逻辑的通用性: 当前插件的计算逻辑是硬编码的。一个更高级的实现可能是,插件能够解析 GraphQL 查询中的特定指令(例如
@compute(type: "movingAverage", window: 50)),从而实现一个通用的、由客户端驱动的计算网关层。 - 安全边界: 在网关层执行任意代码需要非常谨慎。WASM 的沙箱机制提供了很好的安全保障,但插件本身的代码质量、资源消耗(内存/CPU)都需要被严格审查和限制,以防止恶意请求通过复杂的计算耗尽网关资源。
尽管存在这些挑战,将无状态、计算密集的任务从核心服务下沉到 API 网关的 WASM 层,为解决特定类型的性能问题提供了一个强有力的架构模式。它清晰地划分了数据提供与数据处理的界限,让系统的每一部分都更专注于自己的核心职责。