项目初期,我们依赖一个纯 Python 的 Flask 应用对外提供命名实体识别(NER)服务,其核心是基于 spaCy 的一个定制化模型。随着内部平台用户量的增长,这个服务的性能瓶颈日益凸显。Python 的全局解释器锁(GIL)在高并发场景下限制了 CPU 密集型任务的并行能力,单纯增加 uWSGI 的 worker 数量很快就遇到了收益递减的窘境。重写整个 NLP 模型到 Rust 是一项成本高昂且不切实际的任务,模型迭代和训练依然在 Python 生态中进行。
我们的痛点很明确:需要一个既能利用 Rust 高并发 I/O 处理能力,又能无缝集成现有 Python AI 模型,同时还要符合公司统一使用 GitHub SAML 进行单点登录(SSO)的安全规范。最终的技术方案是一个异构的服务架构:使用 Actix-web 作为面向外部的、处理高并发请求和 SAML 认证的网关,通过进程间通信(IPC)将核心 NLP 计算任务分发给一个后台的 Python/spaCy worker 池。
架构决策:为何选择 IPC 而非 HTTP
初步构想是在 Rust 和 Python 服务之间使用 RESTful API 进行通信。这是一个成熟且易于理解的方案,但经过评估,我们放弃了它。在真实项目中,尤其是在性能敏感的内部服务上,HTTP 调用引入的开销不可忽视:网络栈的序列化/反序列化、TCP 握手、HTTP 头解析等都会累积延迟。
考虑到 Rust 网关和 Python worker 将部署在同一台物理机或同一个 Kubernetes Pod 内,我们决定采用更底层的进程间通信方式。ZeroMQ(ZMQ)的 REQ-REP 模式成为了最终选择。它像 TCP 一样提供可靠的消息传递,但 API 更简单,性能开销极低,尤其适合这种紧密耦合的后台服务间通信。
下面是整个请求流程的架构图:
sequenceDiagram
participant User as 用户
participant Actix as Actix-web (Rust)
participant ZMQ as ZeroMQ Socket
participant spaCy as spaCy Worker (Python)
participant GitHub as GitHub IdP
User->>+Actix: 发起 /api/ner 请求 (无 Session)
Actix-->>User: 302 重定向到 GitHub SAML 登录
User->>+GitHub: 输入凭证
GitHub-->>-User: 返回 SAMLResponse (Base64)
User->>+Actix: POST /saml/acs (携带 SAMLResponse)
Actix->>Actix: 验证 SAML 断言 (签名, 时间戳等)
Actix-->>-User: 302 重定向到原始请求 /api/ner (携带 Session Cookie)
User->>+Actix: 再次发起 /api/ner 请求 (携带 Session)
Actix->>Actix: 中间件验证 Session
Actix->>+ZMQ: 发送 NER 请求文本
ZMQ->>+spaCy: 接收文本
spaCy->>spaCy: 执行 spaCy NER 处理
spaCy-->>-ZMQ: 返回处理结果 (JSON)
ZMQ-->>-Actix: 接收结果
Actix-->>-User: 返回 200 OK 与 JSON 结果
核心实现:Actix-web SAML 认证层
在 Rust 生态中处理 SAML 并非易事,不像 Java 或 C# 那样有大量成熟的官方库。我们选择 saml2_rs 这个 crate 来处理 SAML 断言的解析与验证。这部分是整个服务的安全基石,必须严谨处理。
1. 项目结构与依赖
Cargo.toml 需要包含 Actix-web、SAML 处理、配置、序列化和 ZMQ 相关的依赖。
[package]
name = "ner-gateway"
version = "0.1.0"
edition = "2021"
[dependencies]
actix-web = "4"
actix-session = { version = "0.7", features = ["cookie-session"] }
actix-cors = "0.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
env_logger = "0.10"
log = "0.4"
config = "0.13"
zmq = "0.10"
tokio = { version = "1", features = ["full"] }
# SAML specific dependencies
saml2_rs = { version = "0.3", features = ["xml-schema", "reqwest_blocking"] }
base64 = "0.21"
chrono = "0.4"
url = "2.3"
openssl = { version = "0.10", features = ["vendored"] }
2. 配置管理
一个生产级的应用必须将配置外部化。我们使用 config crate 来管理 SAML Identity Provider (IdP) 的元数据。
config/default.toml 文件示例:
# config/default.toml
[saml]
# GitHub IdP 元数据,需要从 GitHub Enterprise 设置中获取
idp_entity_id = "https://github.com/enterprises/YOUR_ENTERPRISE"
idp_sso_url = "https://github.com/enterprises/YOUR_ENTERPRISE/saml/sso"
# IdP 的 X.509 证书 (PEM 格式,去除 -----BEGIN/END CERTIFICATE----- 和换行符)
idp_x509_cert_base64 = "MIID...your_cert...AQAB"
# Service Provider (我们的 Actix-web 应用) 的元数据
sp_entity_id = "https://your-service.com/saml/metadata"
sp_acs_url = "https://your-service.com/saml/acs"
[server]
bind_address = "127.0.0.1:8080"
zmq_socket_address = "ipc:///tmp/ner_service.ipc"
[session]
# 在生产环境中必须使用一个长且随机的密钥
secret_key_base64 = "ASEB2q8e/xN3A+1Tpl4Z8g2f7a6b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3"
3. SAML 断言消费服务 (ACS) 端点
这是整个 SAML 流程的核心。当用户从 GitHub 认证成功后,浏览器会携带一个 SAMLResponse POST 到这个端点。我们需要解码、解析并验证它。
// src/saml_handler.rs
use actix_web::{web, post, HttpRequest, HttpResponse, Responder};
use actix_session::Session;
use saml2_rs::sp::{ServiceProvider, AcsRequest};
use saml2_rs::metadata::Metadata;
use saml2_rs::signature::Signature;
use serde::Deserialize;
use crate::config::SamlConfig;
#[derive(Deserialize)]
pub struct SamlAcsForm {
#[serde(rename = "SAMLResponse")]
saml_response: String,
#[serde(rename = "RelayState")]
relay_state: Option<String>,
}
#[post("/saml/acs")]
pub async fn assertion_consumer_service(
req: HttpRequest,
form: web::Form<SamlAcsForm>,
session: Session,
saml_config: web::Data<SamlConfig>,
) -> impl Responder {
log::info!("Received SAML response at ACS endpoint");
let idp_cert = match base64::decode(&saml_config.idp_x509_cert_base64) {
Ok(cert) => cert,
Err(e) => {
log::error!("Failed to decode IdP certificate: {}", e);
return HttpResponse::InternalServerError().body("IdP certificate configuration error.");
}
};
let service_provider = ServiceProvider {
metadata_url: Some(saml_config.sp_entity_id.clone()),
acs_url: Some(saml_config.sp_acs_url.clone()),
..Default::default()
};
let acs_request = AcsRequest {
// 在生产环境中,应该对请求进行更详细的检查
// 这里为了简化,我们仅传递必要信息
request: &req,
body: form.saml_response.as_bytes(),
relay_state: form.relay_state.clone(),
};
// 核心步骤:解析和验证 SAML 断言
match service_provider.parse_assertion(&idp_cert, acs_request) {
Ok(assertion) => {
log::info!("Successfully parsed and validated SAML assertion for user: {}", &assertion.subject.name_id);
// 在真实项目中,应该检查 assertion 的有效期、受众 (Audience) 等
// if assertion.conditions.not_before > now || assertion.conditions.not_on_or_after < now { ... }
// 认证成功,创建会话
// 我们将用户名存入 session
session.insert("user_id", &assertion.subject.name_id).unwrap();
session.renew();
log::info!("Session created for user: {}", &assertion.subject.name_id);
// 重定向到用户最初想访问的页面,或者首页
let redirect_url = form.relay_state.clone().unwrap_or_else(|| "/".to_string());
HttpResponse::Found()
.append_header(("Location", redirect_url))
.finish()
}
Err(err) => {
log::error!("SAML assertion validation failed: {:?}", err);
HttpResponse::Forbidden().body(format!("Invalid SAML response: {:?}", err))
}
}
}
4. 认证中间件
我们需要一个中间件来保护需要登录才能访问的 API。它会检查 session 中是否存在用户信息。如果不存在,则构建一个 SAML AuthnRequest 并重定向用户到 GitHub 的登录页面。
// src/auth_middleware.rs
use actix_web::{
dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform},
Error, HttpMessage, HttpResponse,
};
use actix_session::SessionExt;
use futures::future::{ok, Ready, LocalBoxFuture};
use saml2_rs::sp::{AuthnRequest, ServiceProvider};
use url::Url;
use crate::config::SamlConfig;
use std::rc::Rc;
pub struct SamlAuth;
impl<S, B> Transform<S, ServiceRequest> for SamlAuth
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type InitError = ();
type Transform = SamlAuthMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(SamlAuthMiddleware { service: Rc::new(service) })
}
}
pub struct SamlAuthMiddleware<S> {
service: Rc<S>,
}
impl<S, B> Service<ServiceRequest> for SamlAuthMiddleware<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
// 对 /saml/acs 和 /saml/metadata 路径放行
if req.path().starts_with("/saml/") {
let fut = self.service.call(req);
return Box::pin(async move {
let res = fut.await?;
Ok(res)
});
}
let session = req.get_session();
let has_user = session.get::<String>("user_id").unwrap_or(None).is_some();
let service = self.service.clone();
Box::pin(async move {
if has_user {
let res = service.call(req).await?;
Ok(res)
} else {
log::info!("No session found, redirecting to SAML IdP.");
let saml_config = req.app_data::<web::Data<SamlConfig>>().unwrap();
let service_provider = ServiceProvider {
metadata_url: Some(saml_config.sp_entity_id.clone()),
acs_url: Some(saml_config.sp_acs_url.clone()),
..Default::default()
};
let authn_request = AuthnRequest::new(service_provider);
let original_uri = req.uri().to_string();
let redirect_url = authn_request.redirect_url(
&saml_config.idp_sso_url,
Some(&original_uri) // RelayState
).map_err(|e| actix_web::error::ErrorInternalServerError(e.to_string()))?;
Ok(req.into_response(
HttpResponse::Found()
.append_header(("Location", redirect_url.as_str()))
.finish()
))
}
})
}
}
桥接 Rust 与 Python:使用 ZMQ
ZMQ 的 REQ-REP 模式非常适合这种一对一的请求-响应场景。Rust 侧是 REQ(请求者),Python 侧是 REP(响应者)。
1. Rust ZMQ Client
我们在 Actix-web 的应用状态中持有一个 ZMQ REQ socket 的客户端池。由于 ZMQ socket 不是 Send 和 Sync 的,我们不能直接在 Actix 的多线程 worker 间共享它。一个常见的模式是为每个 Actix worker 创建一个 ZMQ socket。
// src/ner_client.rs
use zmq::{Context, Socket, REQ};
use std::cell::RefCell;
use crate::config::ServerConfig;
// 使用 thread_local! 为每个 Actix worker 线程创建一个独立的 ZMQ client
thread_local! {
static ZMQ_SOCKET: RefCell<Option<Socket>> = RefCell::new(None);
}
// 在应用启动时,为每个 worker 初始化 ZMQ socket
pub fn init_zmq_socket_for_worker(config: &ServerConfig) {
ZMQ_SOCKET.with(|socket_ref| {
if socket_ref.borrow().is_none() {
log::info!("Initializing ZMQ REQ socket for worker thread...");
let context = Context::new();
let socket = context.socket(REQ).expect("Failed to create ZMQ socket");
socket.connect(&config.zmq_socket_address).expect("Failed to connect to ZMQ socket");
*socket_ref.borrow_mut() = Some(socket);
}
});
}
pub fn request_ner(text: &str) -> Result<String, String> {
ZMQ_SOCKET.with(|socket_ref| {
if let Some(socket) = socket_ref.borrow().as_ref() {
// 设置超时,防止 Python worker 崩溃导致 Rust worker 永久阻塞
socket.set_rcvtimeo(5000).map_err(|e| e.to_string())?;
socket.set_sndtimeo(5000).map_err(|e| e.to_string())?;
match socket.send(text, 0) {
Ok(_) => {
match socket.recv_string(0) {
Ok(Ok(result)) => Ok(result),
Ok(Err(e)) => Err(format!("ZMQ recv non-UTF8 data: {}", e)),
Err(e) => Err(format!("ZMQ recv error: {}", e)),
}
},
Err(e) => Err(format!("ZMQ send error: {}", e)),
}
} else {
Err("ZMQ socket is not initialized for this thread.".to_string())
}
})
}
2. Python spaCy ZMQ Worker
Python worker 是一个简单的长时运行脚本。它在启动时加载昂贵的 spaCy 模型到内存中,然后进入一个循环,监听来自 ZMQ socket 的请求。
requirements.txt:
spacy==3.5.0
pyzmq==25.0.0
# and your spacy model, e.g. en_core_web_sm
worker.py:
import zmq
import spacy
import json
import logging
import os
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def main():
"""
一个 ZMQ REP worker,用于加载 spaCy 模型并处理 NER 请求。
"""
socket_address = os.environ.get("ZMQ_SOCKET_ADDRESS", "ipc:///tmp/ner_service.ipc")
model_name = os.environ.get("SPACY_MODEL", "en_core_web_sm")
try:
logging.info(f"Loading spaCy model: {model_name}...")
nlp = spacy.load(model_name)
logging.info("Model loaded successfully.")
except Exception as e:
logging.error(f"Failed to load spaCy model: {e}")
return
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(socket_address)
logging.info(f"ZMQ worker bound to {socket_address}")
while True:
try:
message = socket.recv_string()
logging.info(f"Received request with text length: {len(message)}")
doc = nlp(message)
entities = [{
'text': ent.text,
'label': ent.label_,
'start_char': ent.start_char,
'end_char': ent.end_char
} for ent in doc.ents]
response = {
"entities": entities
}
socket.send_json(response)
except zmq.ZMQError as e:
logging.error(f"ZMQ error: {e}")
break # Exit on critical ZMQ errors
except Exception as e:
logging.error(f"Error processing request: {e}")
# 返回一个错误信息,而不是让请求方超时
error_response = {"error": str(e)}
socket.send_json(error_response)
if __name__ == "__main__":
main()
组合与部署
最后,我们将所有部分整合到 Actix-web 的 main.rs 中。
// src/main.rs
mod auth_middleware;
mod config;
mod ner_client;
mod saml_handler;
use actix_web::{web, App, HttpServer, Responder, get, post, HttpResponse};
use actix_session::{Session, SessionMiddleware, storage::CookieSessionStore};
use actix_web::cookie::Key;
use crate::config::AppConfig;
use crate::auth_middleware::SamlAuth;
use serde::Deserialize;
#[derive(Deserialize)]
struct NerRequest {
text: String,
}
#[post("/api/ner")]
async fn ner_endpoint(req_body: web::Json<NerRequest>, session: Session) -> impl Responder {
// 确认用户已登录
if session.get::<String>("user_id").unwrap_or(None).is_none() {
return HttpResponse::Unauthorized().finish();
}
// 这里的 web::block 用于将同步的 ZMQ 调用包装在异步函数中
// 这可以防止阻塞 Actix worker 线程
match web::block(|| ner_client::request_ner(&req_body.text)).await {
Ok(Ok(result)) => HttpResponse::Ok()
.content_type("application/json")
.body(result),
Ok(Err(e)) => {
log::error!("NER processing error: {}", e);
HttpResponse::InternalServerError().body(format!("Error during NER processing: {}", e))
},
Err(e) => {
log::error!("Blocking error: {}", e);
HttpResponse::InternalServerError().body(format!("Task execution failed: {}", e))
}
}
}
#[get("/")]
async fn index(session: Session) -> impl Responder {
if let Some(user_id) = session.get::<String>("user_id").unwrap_or(None) {
HttpResponse::Ok().body(format!("Welcome, {}!", user_id))
} else {
HttpResponse::Ok().body("Welcome! Please log in via /api/ner (or any protected route).")
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let config = AppConfig::new().expect("Failed to load configuration");
let server_config = config.server.clone();
let saml_config_data = web::Data::new(config.saml.clone());
let secret_key = Key::from(&base64::decode(&config.session.secret_key_base64).expect("Invalid session secret key"));
HttpServer::new(move || {
// 在每个 worker 中初始化 ZMQ client
ner_client::init_zmq_socket_for_worker(&server_config);
App::new()
.app_data(saml_config_data.clone())
.wrap(SessionMiddleware::new(
CookieSessionStore::default(),
secret_key.clone(),
))
.service(saml_handler::assertion_consumer_service)
.service(
web::scope("")
.wrap(SamlAuth)
.service(ner_endpoint)
.service(index)
)
})
.bind(config.server.bind_address)?
.run()
.await
}
为了在本地运行,可以使用一个简单的 shell 脚本启动 Python worker 和 Rust 服务。
start.sh:
#!/bin/bash
# 清理旧的 socket 文件
rm -f /tmp/ner_service.ipc
# 在后台启动 Python worker
echo "Starting Python spaCy worker..."
export ZMQ_SOCKET_ADDRESS="ipc:///tmp/ner_service.ipc"
python3 worker.py &
PY_WORKER_PID=$!
# 等待一会确保 worker 启动并绑定 socket
sleep 3
# 启动 Rust Actix-web 网关
echo "Starting Rust Actix-web gateway..."
cargo run
# 当 Rust 服务停止时,清理 Python worker
kill $PY_WORKER_PID
echo "Services stopped."
局限性与未来展望
这个架构有效地解决了最初的性能问题,但它并非没有局限性。当前的设计将 Rust 网关和 Python worker 强耦合在同一台机器上,通过 IPC 文件进行通信。这使得水平扩展变得复杂:如果需要多个实例,我们不能简单地增加 Pod 数量,因为每个 Rust 实例都需要一个对应的 Python worker 池。
一个可行的迭代方向是将 ZMQ 的 IPC socket 替换为 TCP socket,并引入一个服务发现机制。这样,Rust 网关可以动态地发现并连接到一组 Python worker,从而实现各自独立的伸缩。
另一个优化点是 Python worker 的并发模型。目前的单进程 worker 无法利用多核优势。可以启动一个 worker 池,并在 Rust 端使用 ROUTER-DEALER 模式的 ZMQ socket 来实现对 worker 池的负载均衡。
最后,SAML session 当前存储在 Actix 的 cookie session 中,这意味着它是单实例的。如果部署多个 Actix-web 实例并使用负载均衡器,用户会话将丢失。一个健壮的解决方案是引入一个外部 session 存储,如 Redis,来共享 session 状态。