集成 Keras Kubeflow 与 Solid.js 构建模型推理的实时可观测性前端


模型通过 Kubeflow 管道成功部署到生产环境,这通常被视为终点,但在我的团队里,这恰恰是问题的开始。一个已经部署的 Keras 模型就像一个黑盒,标准的 Kubernetes 指标(CPU、内存)告诉你容器是否存活,却无法回答关键业务问题:模型的预测延迟是否在 SLO 范围内?输入数据的分布是否开始偏离训练集,预示着模型腐化?哪一类请求导致了最多的预测失败?这些问题不解决,所谓的“MLOps”就只是一个昂贵的 CI/CD 流程。

我们的痛点很明确:需要一个轻量、实时、专注于模型本身行为的可观测性前端。它必须能够以亚秒级的延迟反映模型服务的核心指标,并且在数据量激增时不应冻结或拖垮用户的浏览器。

初步构想与技术选型

最初的想法是构建一个专门的仪表盘。它应该能展示几个核心视图:

  1. 实时 QPS 与错误率: 反映服务健康度的基本指标。
  2. 推理延迟直方图 (Latency Histogram): P95/P99 延迟是衡量用户体验的关键。
  3. 预测置信度分布: 监控模型输出的稳定性。
  4. 特征输入分布 (可选): 用于捕捉数据漂移的早期信号。

这个需求的技术栈选型过程充满了权衡。

  • 前端框架: 我们需要极致的渲染性能。传统的虚拟 DOM 框架在面对每秒多次更新的数十个独立数据点时,其 diffing 和 patching 开销可能会变得显著。这就是 Solid.js 进入视野的原因。它基于 Signal 的细粒度响应式模型,意味着数据更新只会直接操作相关的 DOM 节点,没有 VDOM 开销。对于一个需要持续重绘图表和指标的仪表盘来说,这种架构理论上是完美匹配。
  • 模型服务与指标暴露: Keras 模型本身无法直接提供网络服务。我们需要一个轻量的 Web 框架来包装它。FastAPI 因其高性能和与 Python 类型提示的良好集成而成为首选。更重要的是,通过 prometheus-client 库,我们可以极其方便地在服务中嵌入一个 /metrics 端点,以 Prometheus 格式暴露自定义指标。这是将模型内部状态“外化”的关键一步。
  • 部署与编排: 团队已经在使用 Kubeflow。因此,问题不是要不要用,而是如何将这个新的可观测性层无缝集成到现有的 Kubeflow Pipeline 中。我们需要在管道的最后一步,不仅部署模型服务,还要确保它的指标端点能被集群内的 Prometheus 实例正确发现和抓取。

最终的架构蓝图如下:

graph TD
    subgraph "Kubernetes Cluster"
        subgraph "Kubeflow Pipeline"
            A[Train Keras Model] --> B[Package Model Server];
            B --> C[Deploy to K8s];
        end

        subgraph "ML Service Pod"
            D[FastAPI Server] -- wraps --> E[Keras Model];
            D -- exposes --> F["/metrics (Prometheus)"];
            D -- handles --> G["/predict (Inference API)"];
        end

        subgraph "Monitoring"
            H[Prometheus] -- scrapes --> F;
        end
    end

    subgraph "User Browser"
        I[Solid.js Dashboard] -- PromQL queries --> H;
    end

    C -- creates --> D;
    User -- interacts --> I;
    ExternalClient -- sends data --> G;

步骤化实现:从模型到仪表盘

1. 深度instrumented Keras 模型服务

一切的起点是模型服务本身。如果它不产生高质量的遥测数据,再华丽的前端也无济于事。我们使用 FastAPI 创建一个服务,它不仅要能提供预测,更要像一个忠实的记录员,报告自己的一举一动。

model_server/main.py:

import os
import logging
import time
import numpy as np
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from prometheus_client import Counter, Histogram, Gauge, make_asgi_app
from tensorflow import keras

# -- 1. 日志配置 (Logging Configuration) --
# 在生产环境中,结构化日志是必须的。
logging.basicConfig(
    level=os.getenv("LOG_LEVEL", "INFO"),
    format='%(asctime)s - %(name)s - %(levelname)s - {"message": "%(message)s"}',
)
logger = logging.getLogger(__name__)

# -- 2. 加载模型 (Model Loading) --
# 将模型路径配置化,避免硬编码。
MODEL_PATH = os.getenv("MODEL_PATH", "models/iris_model.h5")
try:
    model = keras.models.load_model(MODEL_PATH)
    logger.info(f"Model loaded successfully from {MODEL_PATH}")
except Exception as e:
    logger.critical(f"Failed to load model from {MODEL_PATH}: {e}")
    # 在模型加载失败时,快速失败是更好的选择。
    exit(1)

# -- 3. 定义 Prometheus 指标 (Metrics Definition) --
# 一个好的命名规范是 `app_context_name_unit`
ML_APP_PREFIX = "iris_model_server"

# Counter: 记录请求总数和结果
REQUESTS_TOTAL = Counter(
    f"{ML_APP_PREFIX}_requests_total",
    "Total number of requests.",
    ["method", "endpoint", "status_code"]
)

# Histogram: 跟踪请求延迟,buckets 需要根据业务场景调整
REQUEST_LATENCY = Histogram(
    f"{ML_APP_PREFIX}_request_latency_seconds",
    "Request latency in seconds.",
    ["method", "endpoint"],
    buckets=[0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 1.0]
)

# Gauge: 记录最后一个样本的预测置信度,用于观察模型输出的波动
LAST_PREDICTION_CONFIDENCE = Gauge(
    f"{ML_APP_PREFIX}_last_prediction_confidence",
    "Confidence of the last prediction."
)

# -- 4. FastAPI 应用 (FastAPI Application) --
app = FastAPI()

# 将 Prometheus 指标作为一个子应用挂载到 /metrics
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)

@app.post("/predict")
async def predict(request: Request):
    start_time = time.time()
    endpoint = "/predict"
    method = "POST"
    
    try:
        # 4.1 输入验证 (Input Validation)
        data = await request.json()
        features = data.get("features")
        if not isinstance(features, list) or len(features) != 4:
            raise ValueError("Input 'features' must be a list of 4 numbers.")

        # 4.2 模型推理 (Model Inference)
        prediction_input = np.array([features])
        prediction = model.predict(prediction_input)
        confidence = float(np.max(prediction))
        predicted_class = int(np.argmax(prediction))

        # 4.3 更新指标 (Update Metrics)
        LAST_PREDICTION_CONFIDENCE.set(confidence)
        
        response_data = {
            "predicted_class": predicted_class,
            "confidence": confidence
        }
        
        # 记录成功指标
        status_code = 200
        REQUESTS_TOTAL.labels(method, endpoint, str(status_code)).inc()
        
        return JSONResponse(content=response_data, status_code=status_code)

    except ValueError as ve:
        status_code = 400
        REQUESTS_TOTAL.labels(method, endpoint, str(status_code)).inc()
        logger.warning(f"Bad Request: {ve}")
        raise HTTPException(status_code=status_code, detail=str(ve))
        
    except Exception as e:
        status_code = 500
        REQUESTS_TOTAL.labels(method, endpoint, str(status_code)).inc()
        logger.error(f"Internal Server Error: {e}", exc_info=True)
        raise HTTPException(status_code=status_code, detail="Internal Server Error")
        
    finally:
        # 无论成功失败,都记录延迟
        latency = time.time() - start_time
        REQUEST_LATENCY.labels(method, endpoint).observe(latency)

@app.get("/health")
def health_check():
    return {"status": "ok"}

这里的关键点在于,指标的定义和更新与业务逻辑紧密结合。我们不仅记录了延迟和请求总数,还通过标签(label)对它们进行了维度切分,这使得后续在前端进行复杂查询和下钻分析成为可能。

2. 服务容器化

下一步是将这个 FastAPI 应用打包成一个可移植的 Docker 镜像。

model_server/Dockerfile:

# 使用官方提供的包含 TensorFlow 的 Python 基础镜像
FROM tensorflow/tensorflow:2.11.0

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装依赖,使用 --no-cache-dir 减小镜像体积
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码和模型
COPY . .

# 暴露端口
EXPOSE 8000

# 定义环境变量的默认值
ENV MODEL_PATH="models/iris_model.h5"
ENV LOG_LEVEL="INFO"

# 启动命令
# 使用 uvicorn 作为 ASGI 服务器,workers 数量可以根据 CPU 核数调整
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

model_server/requirements.txt:

fastapi==0.95.1
uvicorn[standard]==0.21.1
tensorflow==2.11.0
numpy==1.23.5
prometheus-client==0.16.0

这个 Dockerfile 很标准,但注意我们通过 ENV 将模型路径和日志级别暴露为可配置项,这在 CI/CD 流程中非常重要。

3. 集成到 Kubeflow Pipeline

现在,我们需要修改 Kubeflow Pipeline,让它在训练完模型后,自动构建并部署这个带监控的镜像。这里我们使用 kfp SDK v2 来定义管道。

pipeline.py:

from kfp import dsl
from kfp.dsl import Input, Model, Output, Dataset
from kfp.v2.dsl import component

# 这是一个模拟的训练组件,实际项目中会更复杂
@component(
    base_image="tensorflow/tensorflow:2.11.0",
    packages_to_install=["scikit-learn", "numpy"]
)
def train_iris_model(
    model_artifact: Output[Model]
):
    from sklearn.datasets import load_iris
    from sklearn.model_selection import train_test_split
    from tensorflow import keras
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.models import Sequential
    
    iris = load_iris()
    X, y = iris.data, iris.target
    y_cat = keras.utils.to_categorical(y, num_classes=3)
    X_train, X_test, y_train, y_test = train_test_split(X, y_cat, test_size=0.2)

    model = Sequential([
        Dense(10, activation='relu', input_shape=(4,)),
        Dense(10, activation='relu'),
        Dense(3, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model.fit(X_train, y_train, epochs=10, batch_size=5, verbose=0)
    
    # KFP 会自动处理这个路径
    model.save(model_artifact.path)


# 部署组件是关键
# 我们使用 Kaniko 在集群内安全地构建镜像,并使用 kubectl apply 部署
# 这需要 Kaniko 和 kubectl 已经安装在基础镜像中,并且有正确的 RBAC 权限
@component(
    base_image="gcr.io/google.com/cloudsdktool/cloud-sdk:372.0.0" # 包含 gcloud, kubectl
    # 在实际场景中,通常会使用一个包含 Kaniko、kubectl 等工具的自定义镜像
)
def deploy_model_server(
    model_artifact: Input[Model],
    image_name: str,
    deployment_name: str,
    namespace: str,
):
    import subprocess
    import os

    # 这里的代码块在 Kubeflow Pod 中执行
    # 假设部署所需的 YAML 文件和 Dockerfile 上下文已经准备好
    # 在真实项目中,这些文件通常会通过 Git 克隆或作为输入工件传递

    # 1. 准备构建上下文
    # Kaniko 需要一个包含 Dockerfile 和应用代码的目录
    # 此处为简化,我们假设 context 已被某种方式提供到 Pod 中
    # 例如:通过 PVC 或者在组件定义时就包含进来
    
    # ... 此处省略了准备构建上下文的复杂逻辑 ...
    # 假设模型文件和 model_server/ 目录已存在于 /workspace/
    
    model_source_path = model_artifact.path
    model_dest_path = f"/workspace/model_server/models/iris_model.h5"
    os.makedirs(os.path.dirname(model_dest_path), exist_ok=True)
    subprocess.run(f"cp {model_source_path} {model_dest_path}", shell=True, check=True)
    
    # 2. 使用 Kaniko 构建镜像 (示例,需要配置凭证)
    # kaniko_cmd = f"""
    # /kaniko/executor --dockerfile /workspace/model_server/Dockerfile \
    #                  --context dir:///workspace/model_server/ \
    #                  --destination {image_name}
    # """
    # subprocess.run(kaniko_cmd, shell=True, check=True)
    
    # 为简化演示,我们跳过 Kaniko 构建,假设镜像已存在
    print(f"Skipping Kaniko build for demo. Assuming image '{image_name}' exists.")

    # 3. 创建 K8s 部署和服务 YAML
    # 注意 annotations 部分,这是让 Prometheus 发现我们的服务的关键
    yaml_content = f"""
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {deployment_name}
  namespace: {namespace}
  labels:
    app: {deployment_name}
spec:
  replicas: 2
  selector:
    matchLabels:
      app: {deployment_name}
  template:
    metadata:
      labels:
        app: {deployment_name}
    spec:
      containers:
      - name: model-server
        image: {image_name} # 使用我们构建的镜像
        ports:
        - containerPort: 8000
        env:
        - name: MODEL_PATH
          value: "/app/models/iris_model.h5"
---
apiVersion: v1
kind: Service
metadata:
  name: {deployment_name}-svc
  namespace: {namespace}
  labels:
    app: {deployment_name}
  annotations:
    prometheus.io/scrape: 'true'
    prometheus.io/path:   '/metrics'
    prometheus.io/port:   '8000'
spec:
  selector:
    app: {deployment_name}
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
"""
    with open("/workspace/deployment.yaml", "w") as f:
        f.write(yaml_content)

    # 4. 应用部署
    subprocess.run(f"kubectl apply -f /workspace/deployment.yaml", shell=True, check=True)


@dsl.pipeline(
    name="iris-model-pipeline-with-observability",
    description="Train and deploy an iris model with a Prometheus-instrumented server."
)
def iris_pipeline(
    image_name: str = "your-repo/iris-model-server:latest",
    deployment_name: str = "iris-server",
    namespace: str = "ml-serving"
):
    train_task = train_iris_model()
    
    deploy_task = deploy_model_server(
        model_artifact=train_task.outputs["model_artifact"],
        image_name=image_name,
        deployment_name=deployment_name,
        namespace=namespace
    )

prometheus.io/scrape: 'true' 这个 annotation 是魔法所在。它告诉 Prometheus Operator,请来抓取这个 Service 暴露的指标。没有它,我们的监控数据就无处可去。

4. Solid.js 响应式前端

这是将所有数据串联起来并呈现给用户的最后一环。

项目结构:

src/
├── api/
│   └── prometheus.ts   # Prometheus API 查询逻辑
├── components/
│   ├── MetricCard.tsx  # 显示单个指标的卡片
│   └── LatencyChart.tsx # 显示延迟直方图的图表
├── App.tsx             # 主应用组件
└── index.tsx           # 入口

src/api/prometheus.ts:

import { createSignal, onCleanup } from "solid-js";

const PROMETHEUS_URL = "http://your-prometheus-server/api/v1";

// 这是一个关键的自定义 Hook,用于轮询 Prometheus
// 它返回一个响应式的 signal,并在组件卸载时自动清理定时器
export function createPrometheusQuery(query: string, pollInterval: number = 2000) {
  const [data, setData] = createSignal<any>(null);
  const [error, setError] = createSignal<Error | null>(null);
  const [loading, setLoading] = createSignal<boolean>(true);

  const fetchData = async () => {
    try {
      const response = await fetch(`${PROMETHEUS_URL}/query?query=${encodeURIComponent(query)}`);
      if (!response.ok) {
        throw new Error(`Prometheus API Error: ${response.statusText}`);
      }
      const json = await response.json();
      if (json.status !== "success") {
        throw new Error(`Prometheus Query Error: ${json.error}`);
      }
      setData(json.data.result);
    } catch (e: any) {
      setError(e);
      console.error("Failed to fetch Prometheus data:", e);
    } finally {
      setLoading(false);
    }
  };

  // 初始加载
  fetchData();

  // 设置定时轮询
  const intervalId = setInterval(fetchData, pollInterval);

  // Solid.js 的 onCleanup 会在 effect 作用域结束时运行,完美用于清理
  onCleanup(() => {
    clearInterval(intervalId);
  });

  return { data, error, loading };
}

src/components/MetricCard.tsx:

import { Component, Show } from "solid-js";
import { createPrometheusQuery } from "../api/prometheus";

interface MetricCardProps {
  title: string;
  query: string;
  unit?: string;
}

export const MetricCard: Component<MetricCardProps> = (props) => {
  const { data, loading, error } = createPrometheusQuery(props.query, 1500);

  // 从 Prometheus 返回的复杂结构中提取数值
  const formattedValue = () => {
    const result = data();
    if (!result || result.length === 0) return "N/A";
    const value = parseFloat(result[0].value[1]);
    // 根据数值大小决定小数位数
    return value < 1 ? value.toFixed(3) : value.toFixed(1);
  };

  return (
    <div class="bg-gray-800 p-4 rounded-lg shadow-lg text-white">
      <h3 class="text-gray-400 text-sm font-medium">{props.title}</h3>
      <Show when={!loading() && !error()} fallback={<div class="text-3xl font-bold mt-2 animate-pulse">...</div>}>
        <p class="text-3xl font-bold mt-2">
          {formattedValue()}
          <span class="text-lg ml-2 text-gray-400">{props.unit}</span>
        </p>
      </Show>
      <Show when={error()}>
        <p class="text-red-400 text-xs mt-1">Error fetching data.</p>
      </Show>
    </div>
  );
};

src/App.tsx:

import { Component } from 'solid-js';
import { MetricCard } from './components/MetricCard';
// LatencyChart 是一个更复杂的组件,这里省略其实现
// import { LatencyChart } from './components/LatencyChart';

const App: Component = () => {
  const DEPLOYMENT_NAME = "iris-server"; // 应该来自配置

  return (
    <div class="bg-gray-900 min-h-screen p-8 font-sans">
      <header class="mb-8">
        <h1 class="text-4xl font-bold text-white">Keras Model Observability</h1>
        <p class="text-gray-400">Target Deployment: {DEPLOYMENT_NAME}</p>
      </header>
      
      <main class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-6">
        <MetricCard 
          title="QPS (5m)"
          query={`sum(rate(iris_model_server_requests_total{app="${DEPLOYMENT_NAME}"}[5m]))`}
          unit="req/s"
        />
        <MetricCard
          title="Error Rate (5m)"
          query={`
            sum(rate(iris_model_server_requests_total{app="${DEPLOYMENT_NAME}", status_code=~"5.."}[5m]))
            / 
            sum(rate(iris_model_server_requests_total{app="${DEPLOYMENT_NAME}"}[5m])) * 100
          `}
          unit="%"
        />
        <MetricCard
          title="P95 Latency"
          query={`histogram_quantile(0.95, sum(rate(iris_model_server_request_latency_seconds_bucket{app="${DEPLOYMENT_NAME}"}[5m])) by (le))`}
          unit="s"
        />
        <MetricCard
          title="Last Prediction Confidence"
          query={`iris_model_server_last_prediction_confidence{app="${DEPLOYMENT_NAME}"}`}
        />
        
        {/*
        <div class="col-span-1 md:col-span-2 lg:col-span-4 bg-gray-800 p-4 rounded-lg shadow-lg">
          <h3 class="text-gray-400 text-sm font-medium mb-4">Latency Distribution (5m)</h3>
          <LatencyChart 
            query={`sum(rate(iris_model_server_request_latency_seconds_bucket{app="${DEPLOYMENT_NAME}"}[5m])) by (le)`}
          />
        </div>
        */}
      </main>
    </div>
  );
};

export default App;

这个 Solid.js 应用的核心是 createPrometheusQuery hook。它封装了数据获取、状态管理和清理逻辑。每个 MetricCard 都是一个独立的响应式单元。当 Prometheus 数据更新时,只有卡片内的文本节点会变化,没有任何虚拟 DOM 的开销。对于一个需要同时展示几十个实时指标的复杂仪表盘,这种架构的性能优势会非常明显。

局限性与未来迭代路径

这个方案有效地解决了我们最初的痛点,但它并非完美。

  1. 轮询机制的局限: 基于 HTTP 轮询的机制存在延迟,并且会对 Prometheus 服务器产生持续的压力。对于需要更低延迟的场景,可以探索使用 WebSocket 或 Server-Sent Events (SSE) 结合一个推送网关来将指标实时推送到前端。
  2. 日志与指标的割裂: 目前的仪表盘只展示了指标。当发现错误率上升时,工程师仍然需要手动去日志系统(如 Loki 或 ELK)中搜索相关的错误日志。下一步是实现指标与日志的关联。例如,点击错误率图表,可以直接跳转到对应时间范围内的错误日志查询结果。这通常需要统一的标签体系和全链路追踪的支持,比如引入 OpenTelemetry。
  3. 数据漂移监控的深化: 我们只用一个 Gauge 指标来展示最后的置信度,这对于发现数据漂移是远远不够的。一个更高级的方案是在模型服务中定期计算输入特征的统计数据(均值、方差、分位数等),并作为指标暴露。前端则可以对比生产数据的分布与训练数据的基线分布,当差异超过阈值时进行告警。这会让我们的观测能力从“服务健康度”提升到“模型健康度”的层面。

  目录