使用 Actix-web 构建集成 Python spaCy 与 GitHub SAML 的异构 NLP 服务


项目初期,我们依赖一个纯 Python 的 Flask 应用对外提供命名实体识别(NER)服务,其核心是基于 spaCy 的一个定制化模型。随着内部平台用户量的增长,这个服务的性能瓶颈日益凸显。Python 的全局解释器锁(GIL)在高并发场景下限制了 CPU 密集型任务的并行能力,单纯增加 uWSGI 的 worker 数量很快就遇到了收益递减的窘境。重写整个 NLP 模型到 Rust 是一项成本高昂且不切实际的任务,模型迭代和训练依然在 Python 生态中进行。

我们的痛点很明确:需要一个既能利用 Rust 高并发 I/O 处理能力,又能无缝集成现有 Python AI 模型,同时还要符合公司统一使用 GitHub SAML 进行单点登录(SSO)的安全规范。最终的技术方案是一个异构的服务架构:使用 Actix-web 作为面向外部的、处理高并发请求和 SAML 认证的网关,通过进程间通信(IPC)将核心 NLP 计算任务分发给一个后台的 Python/spaCy worker 池。

架构决策:为何选择 IPC 而非 HTTP

初步构想是在 Rust 和 Python 服务之间使用 RESTful API 进行通信。这是一个成熟且易于理解的方案,但经过评估,我们放弃了它。在真实项目中,尤其是在性能敏感的内部服务上,HTTP 调用引入的开销不可忽视:网络栈的序列化/反序列化、TCP 握手、HTTP 头解析等都会累积延迟。

考虑到 Rust 网关和 Python worker 将部署在同一台物理机或同一个 Kubernetes Pod 内,我们决定采用更底层的进程间通信方式。ZeroMQ(ZMQ)的 REQ-REP 模式成为了最终选择。它像 TCP 一样提供可靠的消息传递,但 API 更简单,性能开销极低,尤其适合这种紧密耦合的后台服务间通信。

下面是整个请求流程的架构图:

sequenceDiagram
    participant User as 用户
    participant Actix as Actix-web (Rust)
    participant ZMQ as ZeroMQ Socket
    participant spaCy as spaCy Worker (Python)
    participant GitHub as GitHub IdP

    User->>+Actix: 发起 /api/ner 请求 (无 Session)
    Actix-->>User: 302 重定向到 GitHub SAML 登录
    User->>+GitHub: 输入凭证
    GitHub-->>-User: 返回 SAMLResponse (Base64)
    User->>+Actix: POST /saml/acs (携带 SAMLResponse)
    Actix->>Actix: 验证 SAML 断言 (签名, 时间戳等)
    Actix-->>-User: 302 重定向到原始请求 /api/ner (携带 Session Cookie)
    User->>+Actix: 再次发起 /api/ner 请求 (携带 Session)
    Actix->>Actix: 中间件验证 Session
    Actix->>+ZMQ: 发送 NER 请求文本
    ZMQ->>+spaCy: 接收文本
    spaCy->>spaCy: 执行 spaCy NER 处理
    spaCy-->>-ZMQ: 返回处理结果 (JSON)
    ZMQ-->>-Actix: 接收结果
    Actix-->>-User: 返回 200 OK 与 JSON 结果

核心实现:Actix-web SAML 认证层

在 Rust 生态中处理 SAML 并非易事,不像 Java 或 C# 那样有大量成熟的官方库。我们选择 saml2_rs 这个 crate 来处理 SAML 断言的解析与验证。这部分是整个服务的安全基石,必须严谨处理。

1. 项目结构与依赖

Cargo.toml 需要包含 Actix-web、SAML 处理、配置、序列化和 ZMQ 相关的依赖。

[package]
name = "ner-gateway"
version = "0.1.0"
edition = "2021"

[dependencies]
actix-web = "4"
actix-session = { version = "0.7", features = ["cookie-session"] }
actix-cors = "0.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
env_logger = "0.10"
log = "0.4"
config = "0.13"
zmq = "0.10"
tokio = { version = "1", features = ["full"] }

# SAML specific dependencies
saml2_rs = { version = "0.3", features = ["xml-schema", "reqwest_blocking"] }
base64 = "0.21"
chrono = "0.4"
url = "2.3"
openssl = { version = "0.10", features = ["vendored"] }

2. 配置管理

一个生产级的应用必须将配置外部化。我们使用 config crate 来管理 SAML Identity Provider (IdP) 的元数据。

config/default.toml 文件示例:

# config/default.toml

[saml]
# GitHub IdP 元数据,需要从 GitHub Enterprise 设置中获取
idp_entity_id = "https://github.com/enterprises/YOUR_ENTERPRISE"
idp_sso_url = "https://github.com/enterprises/YOUR_ENTERPRISE/saml/sso"
# IdP 的 X.509 证书 (PEM 格式,去除 -----BEGIN/END CERTIFICATE----- 和换行符)
idp_x509_cert_base64 = "MIID...your_cert...AQAB"

# Service Provider (我们的 Actix-web 应用) 的元数据
sp_entity_id = "https://your-service.com/saml/metadata"
sp_acs_url = "https://your-service.com/saml/acs"

[server]
bind_address = "127.0.0.1:8080"
zmq_socket_address = "ipc:///tmp/ner_service.ipc"

[session]
# 在生产环境中必须使用一个长且随机的密钥
secret_key_base64 = "ASEB2q8e/xN3A+1Tpl4Z8g2f7a6b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3"

3. SAML 断言消费服务 (ACS) 端点

这是整个 SAML 流程的核心。当用户从 GitHub 认证成功后,浏览器会携带一个 SAMLResponse POST 到这个端点。我们需要解码、解析并验证它。

// src/saml_handler.rs
use actix_web::{web, post, HttpRequest, HttpResponse, Responder};
use actix_session::Session;
use saml2_rs::sp::{ServiceProvider, AcsRequest};
use saml2_rs::metadata::Metadata;
use saml2_rs::signature::Signature;
use serde::Deserialize;
use crate::config::SamlConfig;

#[derive(Deserialize)]
pub struct SamlAcsForm {
    #[serde(rename = "SAMLResponse")]
    saml_response: String,
    #[serde(rename = "RelayState")]
    relay_state: Option<String>,
}

#[post("/saml/acs")]
pub async fn assertion_consumer_service(
    req: HttpRequest,
    form: web::Form<SamlAcsForm>,
    session: Session,
    saml_config: web::Data<SamlConfig>,
) -> impl Responder {
    log::info!("Received SAML response at ACS endpoint");

    let idp_cert = match base64::decode(&saml_config.idp_x509_cert_base64) {
        Ok(cert) => cert,
        Err(e) => {
            log::error!("Failed to decode IdP certificate: {}", e);
            return HttpResponse::InternalServerError().body("IdP certificate configuration error.");
        }
    };

    let service_provider = ServiceProvider {
        metadata_url: Some(saml_config.sp_entity_id.clone()),
        acs_url: Some(saml_config.sp_acs_url.clone()),
        ..Default::default()
    };

    let acs_request = AcsRequest {
        // 在生产环境中,应该对请求进行更详细的检查
        // 这里为了简化,我们仅传递必要信息
        request: &req, 
        body: form.saml_response.as_bytes(),
        relay_state: form.relay_state.clone(),
    };

    // 核心步骤:解析和验证 SAML 断言
    match service_provider.parse_assertion(&idp_cert, acs_request) {
        Ok(assertion) => {
            log::info!("Successfully parsed and validated SAML assertion for user: {}", &assertion.subject.name_id);
            
            // 在真实项目中,应该检查 assertion 的有效期、受众 (Audience) 等
            // if assertion.conditions.not_before > now || assertion.conditions.not_on_or_after < now { ... }

            // 认证成功,创建会话
            // 我们将用户名存入 session
            session.insert("user_id", &assertion.subject.name_id).unwrap();
            session.renew();
            log::info!("Session created for user: {}", &assertion.subject.name_id);

            // 重定向到用户最初想访问的页面,或者首页
            let redirect_url = form.relay_state.clone().unwrap_or_else(|| "/".to_string());
            HttpResponse::Found()
                .append_header(("Location", redirect_url))
                .finish()
        }
        Err(err) => {
            log::error!("SAML assertion validation failed: {:?}", err);
            HttpResponse::Forbidden().body(format!("Invalid SAML response: {:?}", err))
        }
    }
}

4. 认证中间件

我们需要一个中间件来保护需要登录才能访问的 API。它会检查 session 中是否存在用户信息。如果不存在,则构建一个 SAML AuthnRequest 并重定向用户到 GitHub 的登录页面。

// src/auth_middleware.rs
use actix_web::{
    dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform},
    Error, HttpMessage, HttpResponse,
};
use actix_session::SessionExt;
use futures::future::{ok, Ready, LocalBoxFuture};
use saml2_rs::sp::{AuthnRequest, ServiceProvider};
use url::Url;
use crate::config::SamlConfig;
use std::rc::Rc;

pub struct SamlAuth;

impl<S, B> Transform<S, ServiceRequest> for SamlAuth
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<B>;
    type Error = Error;
    type InitError = ();
    type Transform = SamlAuthMiddleware<S>;
    type Future = Ready<Result<Self::Transform, Self::InitError>>;

    fn new_transform(&self, service: S) -> Self::Future {
        ok(SamlAuthMiddleware { service: Rc::new(service) })
    }
}

pub struct SamlAuthMiddleware<S> {
    service: Rc<S>,
}

impl<S, B> Service<ServiceRequest> for SamlAuthMiddleware<S>
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<B>;
    type Error = Error;
    type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

    forward_ready!(service);

    fn call(&self, req: ServiceRequest) -> Self::Future {
        // 对 /saml/acs 和 /saml/metadata 路径放行
        if req.path().starts_with("/saml/") {
             let fut = self.service.call(req);
             return Box::pin(async move {
                let res = fut.await?;
                Ok(res)
             });
        }

        let session = req.get_session();
        let has_user = session.get::<String>("user_id").unwrap_or(None).is_some();
        let service = self.service.clone();

        Box::pin(async move {
            if has_user {
                let res = service.call(req).await?;
                Ok(res)
            } else {
                log::info!("No session found, redirecting to SAML IdP.");
                let saml_config = req.app_data::<web::Data<SamlConfig>>().unwrap();
                
                let service_provider = ServiceProvider {
                    metadata_url: Some(saml_config.sp_entity_id.clone()),
                    acs_url: Some(saml_config.sp_acs_url.clone()),
                    ..Default::default()
                };

                let authn_request = AuthnRequest::new(service_provider);
                let original_uri = req.uri().to_string();

                let redirect_url = authn_request.redirect_url(
                    &saml_config.idp_sso_url,
                    Some(&original_uri) // RelayState
                ).map_err(|e| actix_web::error::ErrorInternalServerError(e.to_string()))?;
                
                Ok(req.into_response(
                    HttpResponse::Found()
                        .append_header(("Location", redirect_url.as_str()))
                        .finish()
                ))
            }
        })
    }
}

桥接 Rust 与 Python:使用 ZMQ

ZMQ 的 REQ-REP 模式非常适合这种一对一的请求-响应场景。Rust 侧是 REQ(请求者),Python 侧是 REP(响应者)。

1. Rust ZMQ Client

我们在 Actix-web 的应用状态中持有一个 ZMQ REQ socket 的客户端池。由于 ZMQ socket 不是 SendSync 的,我们不能直接在 Actix 的多线程 worker 间共享它。一个常见的模式是为每个 Actix worker 创建一个 ZMQ socket。

// src/ner_client.rs
use zmq::{Context, Socket, REQ};
use std::cell::RefCell;
use crate::config::ServerConfig;

// 使用 thread_local! 为每个 Actix worker 线程创建一个独立的 ZMQ client
thread_local! {
    static ZMQ_SOCKET: RefCell<Option<Socket>> = RefCell::new(None);
}

// 在应用启动时,为每个 worker 初始化 ZMQ socket
pub fn init_zmq_socket_for_worker(config: &ServerConfig) {
    ZMQ_SOCKET.with(|socket_ref| {
        if socket_ref.borrow().is_none() {
            log::info!("Initializing ZMQ REQ socket for worker thread...");
            let context = Context::new();
            let socket = context.socket(REQ).expect("Failed to create ZMQ socket");
            socket.connect(&config.zmq_socket_address).expect("Failed to connect to ZMQ socket");
            *socket_ref.borrow_mut() = Some(socket);
        }
    });
}

pub fn request_ner(text: &str) -> Result<String, String> {
    ZMQ_SOCKET.with(|socket_ref| {
        if let Some(socket) = socket_ref.borrow().as_ref() {
            // 设置超时,防止 Python worker 崩溃导致 Rust worker 永久阻塞
            socket.set_rcvtimeo(5000).map_err(|e| e.to_string())?;
            socket.set_sndtimeo(5000).map_err(|e| e.to_string())?;

            match socket.send(text, 0) {
                Ok(_) => {
                    match socket.recv_string(0) {
                        Ok(Ok(result)) => Ok(result),
                        Ok(Err(e)) => Err(format!("ZMQ recv non-UTF8 data: {}", e)),
                        Err(e) => Err(format!("ZMQ recv error: {}", e)),
                    }
                },
                Err(e) => Err(format!("ZMQ send error: {}", e)),
            }
        } else {
            Err("ZMQ socket is not initialized for this thread.".to_string())
        }
    })
}

2. Python spaCy ZMQ Worker

Python worker 是一个简单的长时运行脚本。它在启动时加载昂贵的 spaCy 模型到内存中,然后进入一个循环,监听来自 ZMQ socket 的请求。

requirements.txt:

spacy==3.5.0
pyzmq==25.0.0
# and your spacy model, e.g. en_core_web_sm

worker.py:

import zmq
import spacy
import json
import logging
import os

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def main():
    """
    一个 ZMQ REP worker,用于加载 spaCy 模型并处理 NER 请求。
    """
    socket_address = os.environ.get("ZMQ_SOCKET_ADDRESS", "ipc:///tmp/ner_service.ipc")
    model_name = os.environ.get("SPACY_MODEL", "en_core_web_sm")

    try:
        logging.info(f"Loading spaCy model: {model_name}...")
        nlp = spacy.load(model_name)
        logging.info("Model loaded successfully.")
    except Exception as e:
        logging.error(f"Failed to load spaCy model: {e}")
        return

    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(socket_address)
    logging.info(f"ZMQ worker bound to {socket_address}")

    while True:
        try:
            message = socket.recv_string()
            logging.info(f"Received request with text length: {len(message)}")

            doc = nlp(message)
            
            entities = [{
                'text': ent.text,
                'label': ent.label_,
                'start_char': ent.start_char,
                'end_char': ent.end_char
            } for ent in doc.ents]

            response = {
                "entities": entities
            }

            socket.send_json(response)
            
        except zmq.ZMQError as e:
            logging.error(f"ZMQ error: {e}")
            break # Exit on critical ZMQ errors
        except Exception as e:
            logging.error(f"Error processing request: {e}")
            # 返回一个错误信息,而不是让请求方超时
            error_response = {"error": str(e)}
            socket.send_json(error_response)


if __name__ == "__main__":
    main()

组合与部署

最后,我们将所有部分整合到 Actix-web 的 main.rs 中。

// src/main.rs

mod auth_middleware;
mod config;
mod ner_client;
mod saml_handler;

use actix_web::{web, App, HttpServer, Responder, get, post, HttpResponse};
use actix_session::{Session, SessionMiddleware, storage::CookieSessionStore};
use actix_web::cookie::Key;
use crate::config::AppConfig;
use crate::auth_middleware::SamlAuth;
use serde::Deserialize;

#[derive(Deserialize)]
struct NerRequest {
    text: String,
}

#[post("/api/ner")]
async fn ner_endpoint(req_body: web::Json<NerRequest>, session: Session) -> impl Responder {
    // 确认用户已登录
    if session.get::<String>("user_id").unwrap_or(None).is_none() {
        return HttpResponse::Unauthorized().finish();
    }
    
    // 这里的 web::block 用于将同步的 ZMQ 调用包装在异步函数中
    // 这可以防止阻塞 Actix worker 线程
    match web::block(|| ner_client::request_ner(&req_body.text)).await {
        Ok(Ok(result)) => HttpResponse::Ok()
            .content_type("application/json")
            .body(result),
        Ok(Err(e)) => {
            log::error!("NER processing error: {}", e);
            HttpResponse::InternalServerError().body(format!("Error during NER processing: {}", e))
        },
        Err(e) => {
            log::error!("Blocking error: {}", e);
            HttpResponse::InternalServerError().body(format!("Task execution failed: {}", e))
        }
    }
}

#[get("/")]
async fn index(session: Session) -> impl Responder {
    if let Some(user_id) = session.get::<String>("user_id").unwrap_or(None) {
        HttpResponse::Ok().body(format!("Welcome, {}!", user_id))
    } else {
        HttpResponse::Ok().body("Welcome! Please log in via /api/ner (or any protected route).")
    }
}


#[actix_web::main]
async fn main() -> std::io::Result<()> {
    env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

    let config = AppConfig::new().expect("Failed to load configuration");
    let server_config = config.server.clone();
    let saml_config_data = web::Data::new(config.saml.clone());
    
    let secret_key = Key::from(&base64::decode(&config.session.secret_key_base64).expect("Invalid session secret key"));

    HttpServer::new(move || {
        // 在每个 worker 中初始化 ZMQ client
        ner_client::init_zmq_socket_for_worker(&server_config);
        
        App::new()
            .app_data(saml_config_data.clone())
            .wrap(SessionMiddleware::new(
                CookieSessionStore::default(), 
                secret_key.clone(),
            ))
            .service(saml_handler::assertion_consumer_service)
            .service(
                web::scope("")
                    .wrap(SamlAuth)
                    .service(ner_endpoint)
                    .service(index)
            )
    })
    .bind(config.server.bind_address)?
    .run()
    .await
}

为了在本地运行,可以使用一个简单的 shell 脚本启动 Python worker 和 Rust 服务。

start.sh:

#!/bin/bash

# 清理旧的 socket 文件
rm -f /tmp/ner_service.ipc

# 在后台启动 Python worker
echo "Starting Python spaCy worker..."
export ZMQ_SOCKET_ADDRESS="ipc:///tmp/ner_service.ipc"
python3 worker.py &
PY_WORKER_PID=$!

# 等待一会确保 worker 启动并绑定 socket
sleep 3

# 启动 Rust Actix-web 网关
echo "Starting Rust Actix-web gateway..."
cargo run

# 当 Rust 服务停止时,清理 Python worker
kill $PY_WORKER_PID
echo "Services stopped."

局限性与未来展望

这个架构有效地解决了最初的性能问题,但它并非没有局限性。当前的设计将 Rust 网关和 Python worker 强耦合在同一台机器上,通过 IPC 文件进行通信。这使得水平扩展变得复杂:如果需要多个实例,我们不能简单地增加 Pod 数量,因为每个 Rust 实例都需要一个对应的 Python worker 池。

一个可行的迭代方向是将 ZMQ 的 IPC socket 替换为 TCP socket,并引入一个服务发现机制。这样,Rust 网关可以动态地发现并连接到一组 Python worker,从而实现各自独立的伸缩。

另一个优化点是 Python worker 的并发模型。目前的单进程 worker 无法利用多核优势。可以启动一个 worker 池,并在 Rust 端使用 ROUTER-DEALER 模式的 ZMQ socket 来实现对 worker 池的负载均衡。

最后,SAML session 当前存储在 Actix 的 cookie session 中,这意味着它是单实例的。如果部署多个 Actix-web 实例并使用负载均衡器,用户会话将丢失。一个健壮的解决方案是引入一个外部 session 存储,如 Redis,来共享 session 状态。


  目录