团队里不同业务线的推荐场景越来越多,一个痛点也愈发明显:每个新场景,无论是商品推荐、文章推荐,还是最近的图片素材推荐,我们都在重复搭建相似的数据处理、向量化、召回流程。代码库里充斥着大量高度耦合、难以复用的脚本,每次需求变更都像是在雷区里排线。一个业务线的 embedding 模型更新,可能会意外影响另一个看似无关的召回任务。维护成本已经高到无法忽视。
我们需要一个标准化的解决方案,一个“推荐召回工具箱”(Kit),而不是一堆散乱的脚本。这个 Kit 的核心目标是解耦推荐召回链路中的各个组件:数据源、Embedding 模型、向量存储。理想状态下,产品经理提出一个新的推荐需求,我们只需要通过配置文件组合不同的插件,就能快速上线一个原型,而不是重写整个数据管道。
初步构想的核心是微内核与插件化架构。内核(RecommendationKit)负责编排整个流程,它不关心数据具体从哪里来(MySQL?CSV?),也不关心文本或图片如何被转换成向量,更不关心向量最终存入哪个数据库(Pinecone?Milvus?)。这些具体实现都由独立的、可插拔的插件来完成。
graph TD
subgraph RecommendationKit Core
A[Orchestrator]
end
subgraph Plugins
B1[DataSource: CSV]
B2[DataSource: PostgreSQL]
B3[...]
C1[Embedding: SentenceTransformer]
C2[Embedding: OpenAI CLIP]
C3[...]
D1[VectorStore: Pinecone]
D2[VectorStore: Local FAISS]
D3[...]
end
E[config.yaml] --> A
A -- "load_data()" --> B1
A -- "embed()" --> C1
A -- "upsert()" --> D1
style A fill:#f9f,stroke:#333,stroke-width:2px
style B1 fill:#ccf,stroke:#333,stroke-width:1px
style C1 fill:#cfc,stroke:#333,stroke-width:1px
style D1 fill:#fec,stroke:#333,stroke-width:1px
在技术选型上,向量数据库我们选择了 Pinecone。原因很简单:作为初创团队,维护一个自建的 Milvus 或 Weaviate 集群的人力成本太高。Pinecone 的 Serverless 模式提供了开箱即用的高可用性和弹性伸缩,让我们能专注于业务逻辑而非底层运维。它的性能在我们的测试中也完全满足要求。这个决定是基于成本和维护性的务实考量。
定义插件接口
一切始于接口定义。清晰、稳定的接口是插件化架构的基石。我们需要三个核心接口:DataSource、EmbeddingStrategy 和 VectorStore.
# rec_kit/interfaces.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Iterator, Union
import numpy as np
# 数据载体,确保插件间传递的数据结构一致
class Document:
def __init__(self, doc_id: str, content: Union[str, Any], metadata: Dict[str, Any]):
self.id = doc_id
self.content = content
self.metadata = metadata
def __repr__(self):
return f"Document(id={self.id}, metadata={self.metadata})"
# 向量载体
class Vector:
def __init__(self, vec_id: str, values: List[float], metadata: Dict[str, Any]):
self.id = vec_id
self.values = values
self.metadata = metadata
def __repr__(self):
return f"Vector(id={self.id}, metadata={self.metadata})"
class DataSource(ABC):
"""
数据源接口,负责加载原始数据。
使用迭代器模式以支持大规模数据集,避免一次性加载到内存。
"""
@abstractmethod
def __init__(self, config: Dict[str, Any]):
pass
@abstractmethod
def stream_documents(self) -> Iterator[Document]:
"""以流式方式产生文档对象"""
pass
class EmbeddingStrategy(ABC):
"""
向量化策略接口,负责将文档内容转换为向量。
"""
@abstractmethod
def __init__(self, config: Dict[str, Any]):
pass
@abstractmethod
def embed(self, documents: List[Document]) -> List[Vector]:
"""批量将文档内容转换为向量"""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""返回向量维度"""
pass
class VectorStore(ABC):
"""
向量存储接口,负责向量的存储和检索。
"""
@abstractmethod
def __init__(self, config: Dict[str, Any]):
pass
@abstractmethod
def create_index_if_not_exists(self, dimension: int):
"""确保索引存在,如果不存在则根据维度创建"""
pass
@abstractmethod
def upsert(self, vectors: List[Vector], batch_size: int = 100):
"""批量插入或更新向量"""
pass
@abstractmethod
def query(self, query_vector: List[float], top_k: int, filter_criteria: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""执行向量查询"""
pass
这里的关键设计在于:
- 统一数据结构:
Document和Vector作为标准的数据传输对象(DTO),确保了插件之间解耦。 - 流式处理:
DataSource的stream_documents返回一个迭代器,这对于处理G级别甚至T级别的数据至关重要,可以有效控制内存占用。 - 批量操作:
embed和upsert都设计为批量操作,这是在与外部服务(如模型API、向量数据库)交互时提升性能的常用手段。
实现核心插件:以 Pinecone 和 SentenceTransformer 为例
接口只是蓝图,现在需要具体的实现。我们先实现最常用的文本推荐场景所需的一套插件。
Pinecone 向量存储插件
这个插件是与 Pinecone 服务交互的封装。一个常见的错误是在每个操作中都初始化客户端,正确的做法是在 __init__ 中初始化一次并复用。配置和 API Key 必须通过外部传入,而不是硬编码。
# rec_kit/plugins/vector_stores/pinecone_store.py
import os
import logging
from typing import List, Dict, Any
from pinecone import Pinecone as PineconeClient, PodSpec
from tenacity import retry, stop_after_attempt, wait_random_exponential
from rec_kit.interfaces import VectorStore, Vector
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class PineconeVectorStore(VectorStore):
def __init__(self, config: Dict[str, Any]):
api_key = os.getenv("PINECONE_API_KEY")
if not api_key:
raise ValueError("PINECONE_API_KEY environment variable not set.")
self.index_name = config.get("index_name")
if not self.index_name:
raise ValueError("`index_name` must be provided in config.")
self.environment = os.getenv("PINECONE_ENVIRONMENT", "gcp-starter") # 默认环境
# 生产级代码必须考虑客户端的初始化和复用
try:
self.client = PineconeClient(api_key=api_key)
self.index = None
except Exception as e:
logger.error(f"Failed to initialize Pinecone client: {e}")
raise
def create_index_if_not_exists(self, dimension: int):
if self.index_name not in self.client.list_indexes().names():
logger.info(f"Index '{self.index_name}' not found. Creating a new one with dimension {dimension}.")
try:
self.client.create_index(
name=self.index_name,
dimension=dimension,
metric="cosine", # 对于embedding,cosine相似度通常是好的选择
spec=PodSpec(environment=self.environment)
)
logger.info(f"Index '{self.index_name}' created successfully.")
except Exception as e:
logger.error(f"Failed to create Pinecone index: {e}")
raise
else:
logger.info(f"Index '{self.index_name}' already exists.")
self.index = self.client.Index(self.index_name)
# 增加重试机制,应对网络波动或Pinecone的临时不可用
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(5))
def upsert(self, vectors: List[Vector], batch_size: int = 100):
if not self.index:
raise RuntimeError("Index is not initialized. Call create_index_if_not_exists first.")
logger.info(f"Upserting {len(vectors)} vectors in batches of {batch_size}...")
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
vectors_to_upsert = [
{"id": v.id, "values": v.values, "metadata": v.metadata} for v in batch
]
try:
self.index.upsert(vectors=vectors_to_upsert)
except Exception as e:
logger.error(f"Error during batch upsert to Pinecone: {e}")
# 抛出异常以触发tenacity重试
raise
logger.info("Upsert completed.")
@retry(wait=wait_random_exponential(min=1, max=30), stop=stop_after_attempt(3))
def query(self, query_vector: List[float], top_k: int, filter_criteria: Dict[str, Any] = None) -> List[Dict[str, Any]]:
if not self.index:
raise RuntimeError("Index is not initialized.")
try:
results = self.index.query(
vector=query_vector,
top_k=top_k,
filter=filter_criteria,
include_metadata=True
)
return [
{"id": match.id, "score": match.score, "metadata": match.metadata}
for match in results.matches
]
except Exception as e:
logger.error(f"Error during query from Pinecone: {e}")
raise
SentenceTransformer 文本向量化插件
这个插件封装了一个本地的 embedding 模型,避免了对外部 API 的依赖和费用。
# rec_kit/plugins/embedding_strategies/sentence_transformer_strategy.py
from typing import List, Dict, Any
from sentence_transformers import SentenceTransformer
import numpy as np
import logging
from rec_kit.interfaces import EmbeddingStrategy, Document, Vector
logger = logging.getLogger(__name__)
class SentenceTransformerStrategy(EmbeddingStrategy):
def __init__(self, config: Dict[str, Any]):
model_name = config.get("model_name", "all-MiniLM-L6-v2")
logger.info(f"Loading SentenceTransformer model: {model_name}")
try:
self.model = SentenceTransformer(model_name)
self._dimension = self.model.get_sentence_embedding_dimension()
except Exception as e:
logger.error(f"Failed to load model {model_name}: {e}")
raise
def embed(self, documents: List[Document]) -> List[Vector]:
# 只处理content是字符串的情况
contents = [doc.content for doc in documents if isinstance(doc.content, str)]
if not contents:
return []
logger.info(f"Embedding {len(contents)} documents...")
# encode方法本身就是优化的批量操作
embeddings = self.model.encode(contents, show_progress_bar=False)
vectors = []
doc_idx = 0
for doc in documents:
if isinstance(doc.content, str):
vectors.append(
Vector(
vec_id=doc.id,
values=embeddings[doc_idx].tolist(),
metadata=doc.metadata
)
)
doc_idx += 1
return vectors
@property
def dimension(self) -> int:
return self._dimension
微内核:编排与动态加载
内核是整个 Kit 的大脑。它负责解析配置,动态加载并初始化所需的插件,然后按照 数据读取 -> 向量化 -> 存储 的顺序驱动整个流程。动态加载是关键,它使得我们可以在不修改内核代码的情况下,仅通过增加插件文件和修改配置来扩展系统功能。
# rec_kit/core.py
import yaml
import importlib
import logging
from typing import Dict, Any, List
from rec_kit.interfaces import DataSource, EmbeddingStrategy, VectorStore, Document
logger = logging.getLogger(__name__)
class RecommendationKit:
def __init__(self, config_path: str):
logger.info(f"Initializing RecommendationKit with config: {config_path}")
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.data_source: DataSource = self._load_plugin("data_source")
self.embedding_strategy: EmbeddingStrategy = self._load_plugin("embedding_strategy")
self.vector_store: VectorStore = self._load_plugin("vector_store")
# 核心的校验步骤:确保向量存储的维度和模型维度匹配
self.vector_store.create_index_if_not_exists(self.embedding_strategy.dimension)
def _load_plugin(self, plugin_type: str):
"""
动态加载插件。
这里的实现比较简单,生产环境可能需要更复杂的插件发现机制。
"""
try:
plugin_config = self.config[plugin_type]
module_path = plugin_config["module"]
class_name = plugin_config["class"]
params = plugin_config.get("params", {})
logger.info(f"Loading plugin: {module_path}.{class_name}")
module = importlib.import_module(module_path)
plugin_class = getattr(module, class_name)
return plugin_class(params)
except (KeyError, ImportError, AttributeError) as e:
logger.error(f"Failed to load plugin of type '{plugin_type}': {e}")
raise ValueError(f"Configuration error for plugin '{plugin_type}'") from e
def run_indexing(self, batch_size: int = 64):
"""
执行完整的索引构建流程
"""
logger.info("Starting indexing process...")
doc_stream = self.data_source.stream_documents()
doc_batch: List[Document] = []
total_processed = 0
for document in doc_stream:
doc_batch.append(document)
if len(doc_batch) >= batch_size:
self._process_batch(doc_batch)
total_processed += len(doc_batch)
logger.info(f"Processed {total_processed} documents so far.")
doc_batch = []
# 处理最后一批不足batch_size的数据
if doc_batch:
self._process_batch(doc_batch)
total_processed += len(doc_batch)
logger.info(f"Indexing process finished. Total documents processed: {total_processed}")
def _process_batch(self, documents: List[Document]):
vectors = self.embedding_strategy.embed(documents)
if vectors:
self.vector_store.upsert(vectors)
def search(self, query_content: Any, top_k: int = 10, filter_criteria: Dict = None) -> List[Dict]:
"""
执行查询
"""
logger.info(f"Performing search for query content with top_k={top_k}")
# 将查询内容包装成Document,以便复用embed方法
query_doc = Document(doc_id="query", content=query_content, metadata={})
query_vector = self.embedding_strategy.embed([query_doc])[0]
return self.vector_store.query(query_vector.values, top_k, filter_criteria)
配置文件与实战
现在,所有组件都已就绪。我们可以通过一个简单的 YAML 文件来定义一个完整的推荐召回管道。
# config.yaml
data_source:
module: rec_kit.plugins.data_sources.csv_source
class: CsvDataSource
params:
file_path: "./data/sample_articles.csv"
id_column: "article_id"
content_column: "title"
metadata_columns: ["category", "publish_date"]
embedding_strategy:
module: rec_kit.plugins.embedding_strategies.sentence_transformer_strategy
class: SentenceTransformerStrategy
params:
model_name: "paraphrase-multilingual-MiniLM-L12-v2"
vector_store:
module: rec_kit.plugins.vector_stores.pinecone_store
class: PineconeVectorStore
params:
index_name: "article-recommendation-index"
为了让这个配置能跑起来,还需要一个 CsvDataSource 插件。
# rec_kit/plugins/data_sources/csv_source.py
import csv
from typing import Dict, Any, Iterator, List
from rec_kit.interfaces import DataSource, Document
import logging
logger = logging.getLogger(__name__)
class CsvDataSource(DataSource):
def __init__(self, config: Dict[str, Any]):
self.file_path = config["file_path"]
self.id_column = config["id_column"]
self.content_column = config["content_column"]
self.metadata_columns = config.get("metadata_columns", [])
logger.info(f"CSVDataSource initialized for file: {self.file_path}")
def stream_documents(self) -> Iterator[Document]:
try:
with open(self.file_path, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
metadata = {col: row.get(col) for col in self.metadata_columns if col in row}
yield Document(
doc_id=row[self.id_column],
content=row[self.content_column],
metadata=metadata
)
except FileNotFoundError:
logger.error(f"CSV file not found at: {self.file_path}")
raise
except KeyError as e:
logger.error(f"Column not found in CSV: {e}. Check your config.")
raise
最后,一个执行入口脚本:
# main.py
import os
import logging
from dotenv import load_dotenv
from rec_kit.core import RecommendationKit
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def setup_environment():
"""加载环境变量,这是管理密钥的最佳实践"""
load_dotenv()
if not os.getenv("PINECONE_API_KEY"):
logging.warning("PINECONE_API_KEY is not set. Please create a .env file.")
exit(1)
def run():
setup_environment()
# 实例化Kit,所有魔法都从这里开始
kit = RecommendationKit(config_path="config.yaml")
# 运行索引流程
# 在真实项目中,这会是一个离线的、定时的ETL任务
# kit.run_indexing()
# 执行在线查询
query_text = "最新的AI技术进展"
logging.info(f"\nSearching for: '{query_text}'")
# 示例:增加元数据过滤
filters = {"category": {"$eq": "Technology"}}
results = kit.search(query_text, top_k=5, filter_criteria=filters)
logging.info("Search Results:")
for res in results:
print(f" - ID: {res['id']}, Score: {res['score']:.4f}, Metadata: {res['metadata']}")
if __name__ == "__main__":
run()
现在,如果我们需要支持图片推荐,该怎么做?无需修改任何核心代码。我们只需要:
- 实现一个新的
ImageDataSource,从文件夹或数据库读取图片。 - 实现一个新的
CLIPEmbeddingStrategy,使用 CLIP 模型将图片转换为向量。 - 创建一个新的
config_image.yaml文件,将data_source和embedding_strategy指向新的插件实现。
这种扩展性正是我们最初追求的目标。
局限性与未来迭代
这个 Kit 目前只解决了召回(Recall)阶段的问题,一个完整的生产级推荐系统远比这复杂。它缺少了关键的排序(Ranking)和重排序(Re-ranking)阶段。当前的设计并没有为这些阶段留下接口。
其次,插件加载机制过于简单,直接依赖 importlib 和文件路径。在大型项目中,更健壮的方案是使用 Python 的 entry_points 机制,实现真正的插件自发现,让插件以独立的包形式存在和分发。
未来的迭代方向很明确:
- 增加排序器插件(Ranker): 在
search流程中引入一个可插拔的排序步骤,它可以是一个简单的基于业务规则的排序器,也可以是一个复杂的机器学习模型。 - 异步化改造: 对于在线查询,
embed步骤可能会成为瓶颈。可以考虑使用异步框架(如asyncio),将模型推理等 IO 密集型操作异步化,以提高查询接口的吞吐量。 - 特征工程与管理: 引入一个
FeatureStore插件的接口,用于在召回和排序阶段获取实时或离线特征,这对于个性化推荐至关重要。 - 更完善的错误处理与监控: 集成更详细的监控指标(例如,每个插件的处理延迟、成功率),并对插件执行失败提供更优雅的降级策略。