一个高并发系统的性能瓶颈,往往最先出现在数据库的写操作上。当读请求与写请求在同一个TiDB事务领导者(leader)节点上发生激烈争抢时,系统的整体吞吐量和响应时间会急剧恶化。常规的解决方案是读写分离,将读流量引导至TiDB的追随者(follower)副本,但这在声明式、自动化的云原生环境中引入了新的挑战:如何管理路由配置?如何保证事务和会话级别的数据一致性?以及,如何避免配置变更导致的应用重启?
在真实项目中,任何需要手动介入修改配置文件的架构,都是潜在的故障点。我们追求的是一种由Git仓库驱动、变更可追溯、配置与应用部署生命周期解耦的声明式方案。
架构决策:应用层路由 vs. 基础设施层路由
在设计读写分离架构时,存在两种主流方案。
方案A:基础设施层路由
通过独立的中间件(如ProxySQL)或服务网格(Service Mesh)在应用外部进行流量路由。应用本身对数据库的物理拓扑无感知,只连接一个虚拟的数据库端点。
- 优势: 对应用代码透明,理论上可以做到零侵入。
- 劣势:
- 上下文缺失: 代理层无法感知应用的事务边界。一个事务内的多个读操作可能被路由到不同的副本,破坏事务的读一致性。
- 会话一致性难题: 用户写入数据后,紧接着的读请求如果被路由到一个有延迟的副本,会读到旧数据。解决此问题需要复杂的粘性会话配置,增加了基础设施的复杂度。
- 管理复杂性: 引入了新的高可用组件,其自身的配置、监控和故障排查都是额外的运维负担。
方案B:应用层路由
在应用程序的数据访问层内部实现路由逻辑,例如通过自定义的DataSource。
- 优势:
- 完全的上下文感知: 路由逻辑可以精确控制在事务之内或之外。可以轻松实现“事务内的所有读写都走主库,事务外的读走从库”的策略。
- 精细化控制: 能够基于方法、注解甚至请求参数来决定数据源,灵活性极高。
- 架构简化: 无需额外部署和维护数据库代理中间件。
- 劣势:
- 代码侵入: 需要在应用中编写路由逻辑,与特定框架(如MyBatis, Spring)耦合。
- 配置管理: 路由规则(如主库、从库地址)如果硬编码在代码或配置文件中,会变得僵化。
最终选择与理由
我们选择方案B:应用层路由。关键在于,数据一致性的控制权必须掌握在最了解业务逻辑的应用层手中。基础设施层路由的“透明性”实际上是一种假象,它隐藏了至关重要的一致性问题,最终会导致难以排查的业务数据错误。
而方案B的“配置管理”劣势,恰好可以被GitOps理念完美解决。我们将使用Flux CD来管理一个Kubernetes ConfigMap,该ConfigMap包含数据源的地址和路由策略。应用在启动时及运行时动态加载此ConfigMap,从而将路由逻辑与具体的环境配置解耦,实现了架构的声明式管理。
核心实现概览
整个系统的运转流程将通过GitOps来驱动。
graph TD
subgraph Git Repository
A[Git Commit: Update DB Replica URL]
end
subgraph Kubernetes Cluster
B(Flux CD Controller)
C(Kustomization)
D[ConfigMap: db-routing-config]
E[Pod: Java Application]
F[MyBatis RoutingDataSource]
G[Caddy Ingress]
H[TiDB Cluster: Leader]
I[TiDB Cluster: Follower 1]
J[TiDB Cluster: Follower 2]
K[Memcached]
end
A -->|Notifies| B
B -->|Applies| C
C -->|Updates| D
E -->|Mounts/Watches| D
D -- Contains JDBC URLs --> F
E -.->|Reloads Config| F
G -->|Routes HTTP Traffic| E
subgraph "User Request Flow"
User --> G
E --> F
F -- Write or @PrimaryRead --> H
F -- Read Operation --> I
F -- Read Operation --> J
F -- Session Consistency Check --> K
end
GitOps与Kubernetes资源定义
Flux CD通过Kustomization资源来同步Git仓库中的定义到集群。我们的Git仓库结构如下:
clusters/production/
├── flux-system/
│ ├── gotk-components.yaml
│ └── gotk-sync.yaml
└── apps/
├── base/
│ ├── deployment.yaml
│ ├── service.yaml
│ ├── ingress.yaml
│ └── kustomization.yaml
├── data-access-layer/
│ ├── db-configmap.yaml
│ └── kustomization.yaml
└── kustomization.yaml
核心是db-configmap.yaml,它由Flux CD管理,并被应用Pod挂载为文件或环境变量。
clusters/production/apps/data-access-layer/db-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: tidb-routing-config
namespace: my-app
data:
# 主库地址,指向TiDB Leader Service
primary.url: "jdbc:mysql://tidb-leader.data.svc.cluster.local:4000/appdb?useSSL=false&characterEncoding=utf8"
primary.username: "root"
# 从库地址列表,指向TiDB Follower Service,以逗号分隔
replicas.urls: "jdbc:mysql://tidb-follower.data.svc.cluster.local:4000/appdb?useSSL=false&characterEncoding=utf8,jdbc:mysql://tidb-follower-2.data.svc.cluster.local:4000/appdb?useSSL=false&characterEncoding=utf8"
replicas.username: "readonly_user"
# 路由策略配置
session.consistency.ttl.seconds: "30"
注意,这里使用了Kubernetes内部的Service DNS名称,如tidb-leader.data.svc.cluster.local。这要求TiDB Operator已经正确部署并创建了分离的Leader和Follower服务。
应用的deployment.yaml需要引用这个ConfigMap来注入密码(通过Secret)和配置。
clusters/production/apps/base/deployment.yaml (片段)
# ...
spec:
template:
spec:
containers:
- name: my-app
# ...
env:
- name: DB_PRIMARY_URL
valueFrom:
configMapKeyRef:
name: tidb-routing-config
key: primary.url
- name: DB_PRIMARY_USERNAME
valueFrom:
configMapKeyRef:
name: tidb-routing-config
key: primary.username
- name: DB_PRIMARY_PASSWORD
valueFrom:
secretKeyRef:
name: tidb-credentials
key: primary-password
- name: DB_REPLICAS_URLS
valueFrom:
configMapKeyRef:
name: tidb-routing-config
key: replicas.urls
- name: DB_REPLICAS_USERNAME
valueFrom:
configMapKeyRef:
name: tidb-routing-config
key: replicas.username
- name: DB_REPLICAS_PASSWORD
valueFrom:
secretKeyRef:
name: tidb-credentials
key: readonly-password
# ...
MyBatis动态数据源实现
这是整个架构的核心代码部分。我们需要实现一个能够根据上下文动态选择数据源的RoutingDataSource。
1. 定义数据源类型和上下文持有者
// src/main/java/com/myapp/config/db/DataSourceType.java
package com.myapp.config.db;
public enum DataSourceType {
PRIMARY,
REPLICA
}
// src/main/java/com/myapp/config/db/DataSourceContextHolder.java
package com.myapp.config.db;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class DataSourceContextHolder {
private static final Logger log = LoggerFactory.getLogger(DataSourceContextHolder.class);
// 使用ThreadLocal确保每个线程的上下文独立
private static final ThreadLocal<DataSourceType> contextHolder = new ThreadLocal<>();
public static void set(DataSourceType dataSourceType) {
if (dataSourceType == null) {
throw new NullPointerException("DataSourceType cannot be null");
}
log.debug("Setting DataSource to [{}] for thread [{}]", dataSourceType, Thread.currentThread().getName());
contextHolder.set(dataSourceType);
}
public static DataSourceType get() {
// 默认返回主库
return contextHolder.get() == null ? DataSourceType.PRIMARY : contextHolder.get();
}
public static void clear() {
log.debug("Clearing DataSource for thread [{}]", Thread.currentThread().getName());
contextHolder.remove();
}
}
2. 自定义RoutingDataSource
// src/main/java/com/myapp/config/db/DynamicRoutingDataSource.java
package com.myapp.config.db;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.Map;
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
private final AtomicInteger counter = new AtomicInteger(0);
private List<Object> replicaDataSources;
@Override
protected Object determineCurrentLookupKey() {
// 从ThreadLocal中获取数据源类型
return DataSourceContextHolder.get();
}
@Override
public void setTargetDataSources(Map<Object, Object> targetDataSources) {
super.setTargetDataSources(targetDataSources);
// 分离出从库列表,用于后续的负载均衡
this.replicaDataSources = targetDataSources.values().stream()
.filter(ds -> !ds.equals(targetDataSources.get(DataSourceType.PRIMARY)))
.toList();
}
@Override
protected Object determineTargetDataSource() {
DataSourceType type = (DataSourceType) determineCurrentLookupKey();
if (type == DataSourceType.REPLICA) {
if (replicaDataSources == null || replicaDataSources.isEmpty()) {
// 如果没有配置从库,则降级到主库
logger.warn("No replica data sources configured, falling back to primary.");
return super.determineTargetDataSource();
}
// 简单的轮询负载均衡策略
int index = counter.getAndIncrement() % replicaDataSources.size();
if(counter.get() > 10000) { // 防止溢出
counter.set(0);
}
return replicaDataSources.get(Math.abs(index));
}
// 默认或指定PRIMARY时,调用父类方法获取主库
return super.determineTargetDataSource();
}
}
这里的 determineTargetDataSource 被重写以实现从库的轮询负载均衡。在真实的生产环境中,可能会采用更复杂的策略,如基于响应时间的加权轮询。
3. 使用AOP进行自动切换
我们不希望在业务代码中显式调用DataSourceContextHolder.set()。通过AOP,我们可以基于注解自动完成切换。
// src/main/java/com/myapp/config/db/ReadOnly.java
package com.myapp.config.db;
import java.lang.annotation.*;
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReadOnly {
}
// src/main/java/com/myapp/config/db/DataSourceAspect.java
package com.myapp.config.db;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationManager;
@Aspect
@Component
@Order(-1) // 保证在事务切面之前执行
public class DataSourceAspect {
private static final Logger log = LoggerFactory.getLogger(DataSourceAspect.class);
// 切入点:所有被@ReadOnly注解标记的方法
@Pointcut("@annotation(com.myapp.config.db.ReadOnly)")
public void readOnlyPointcut() {}
@Around("readOnlyPointcut()")
public Object proceed(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
// 如果当前已在事务中,为保证一致性,强制走主库
if (TransactionSynchronizationManager.isActualTransactionActive()) {
log.warn("Method [{}] is marked as @ReadOnly but is executed within an active transaction. Forcing PRIMARY datasource.", proceedingJoinPoint.getSignature());
DataSourceContextHolder.set(DataSourceType.PRIMARY);
} else {
DataSourceContextHolder.set(DataSourceType.REPLICA);
}
try {
return proceedingJoinPoint.proceed();
} finally {
// 方法执行结束后,清理ThreadLocal,防止内存泄漏和状态污染
DataSourceContextHolder.clear();
}
}
}
这个切面非常关键。它确保了任何标记为@ReadOnly的方法,在没有事务包裹的情况下,都会路由到从库。如果存在于一个事务中(通常由@Transactional开启),则会打印警告并强制使用主库,这是保证事务完整性的必要措施。
4. Spring Boot配置
// src/main/java/com/myapp/config/DataSourceConfig.java
package com.myapp.config;
import com.myapp.config.db.DataSourceType;
import com.myapp.config.db.DynamicRoutingDataSource;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.util.StringUtils;
import javax.sql.DataSource;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.primary")
public DataSourceProperties primaryDataSourceProperties() {
return new DataSourceProperties();
}
@Bean(name = "primaryDataSource")
public DataSource primaryDataSource() {
return primaryDataSourceProperties().initializeDataSourceBuilder()
.type(HikariDataSource.class).build();
}
@Bean
@ConfigurationProperties("spring.datasource.replicas")
public DataSourceProperties replicasDataSourceProperties() {
// 这个bean主要用于获取配置前缀,实际的URLs在下面处理
return new DataSourceProperties();
}
@Bean
@Primary
public DataSource routingDataSource(@Qualifier("primaryDataSource") DataSource primaryDataSource,
DataSourceProperties replicasDataSourceProperties) {
DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceType.PRIMARY, primaryDataSource);
String replicaUrls = replicasDataSourceProperties.getUrl();
if (StringUtils.hasText(replicaUrls)) {
String[] urls = replicaUrls.split(",");
for (int i = 0; i < urls.length; i++) {
DataSource replicaDS = createReplicaDataSource(replicasDataSourceProperties, urls[i].trim());
// 这里使用一个唯一的key来区分不同的从库实例
targetDataSources.put("REPLICA_" + i, replicaDS);
}
}
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(primaryDataSource);
routingDataSource.afterPropertiesSet(); // 手动调用初始化
return routingDataSource;
}
private DataSource createReplicaDataSource(DataSourceProperties baseProperties, String url) {
DataSourceProperties replicaProperties = new DataSourceProperties();
// 继承用户名、密码、驱动等通用配置
replicaProperties.setUsername(baseProperties.getUsername());
replicaProperties.setPassword(baseProperties.getPassword());
replicaProperties.setDriverClassName(baseProperties.getDriverClassName());
replicaProperties.setUrl(url);
// 可以为从库配置不同的连接池参数
HikariDataSource ds = replicaProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
ds.setReadOnly(true); // 重要的优化
ds.setPoolName("HikariPool-Replica-" + url.hashCode());
return ds;
}
}
在application.yml中,我们将连接ConfigMap注入的环境变量。
# src/main/resources/application.yml
spring:
datasource:
primary:
url: ${DB_PRIMARY_URL}
username: ${DB_PRIMARY_USERNAME}
password: ${DB_PRIMARY_PASSWORD}
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.zaxxer.hikari.HikariDataSource
hikari:
pool-name: HikariPool-Primary
maximum-pool-size: 20
replicas:
# 注意这里我们只读取一个URL属性,但它包含逗号分隔的多个URL
url: ${DB_REPLICAS_URLS}
username: ${DB_REPLICAS_USERNAME}
password: ${DB_REPLICAS_PASSWORD}
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.zaxxer.hikari.HikariDataSource
hikari:
maximum-pool-size: 50
解决会话一致性问题
现在,一个棘手的问题浮出水面:用户更新个人资料后(写主库),立即刷新页面(读从库),可能会因为TiDB Raft协议的同步延迟,看到旧的资料。
我们的解决方案是利用Memcached实现一个短暂的“主库亲和性”标记。
// src/main/java/com/myapp/service/SessionConsistencyService.java
package com.myapp.service;
import net.spy.memcached.MemcachedClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class SessionConsistencyService {
private final MemcachedClient memcachedClient;
private final int ttlSeconds;
private static final String KEY_PREFIX = "session_consistency:";
public SessionConsistencyService(MemcachedClient memcachedClient,
@Value("${session.consistency.ttl.seconds:30}") int ttlSeconds) {
this.memcachedClient = memcachedClient;
this.ttlSeconds = ttlSeconds;
}
// 在任何写操作之后调用
public void maintainConsistency(String userId) {
// 设置一个带有TTL的键,值为任意非空字符串
memcachedClient.set(KEY_PREFIX + userId, ttlSeconds, "force_primary");
}
// 在读操作之前检查
public boolean shouldForcePrimary(String userId) {
return memcachedClient.get(KEY_PREFIX + userId) != null;
}
}
然后,我们修改DataSourceAspect来集成这个检查。
// src/main/java/com/myapp/config/db/DataSourceAspect.java (修改后)
// ... imports
import com.myapp.service.SessionConsistencyService;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
@Aspect
@Component
@Order(-1)
public class DataSourceAspect {
// ... logger
private final SessionConsistencyService consistencyService;
public DataSourceAspect(SessionConsistencyService consistencyService) {
this.consistencyService = consistencyService;
}
@Pointcut("@annotation(com.myapp.config.db.ReadOnly)")
public void readOnlyPointcut() {}
@Around("readOnlyPointcut()")
public Object proceed(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
// ... 事务内逻辑不变
} else {
String currentUserId = getCurrentUserId(); // 需要实现获取当前用户ID的逻辑
if (currentUserId != null && consistencyService.shouldForcePrimary(currentUserId)) {
log.info("Forcing PRIMARY for user [{}] due to session consistency window.", currentUserId);
DataSourceContextHolder.set(DataSourceType.PRIMARY);
} else {
DataSourceContextHolder.set(DataSourceType.REPLICA);
}
}
try {
return proceedingJoinPoint.proceed();
} finally {
DataSourceContextHolder.clear();
}
}
private String getCurrentUserId() {
// 这是一个示例实现,具体取决于你的认证机制
// 例如,从Spring Security Context或请求头中获取
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null) {
return attributes.getRequest().getHeader("X-User-ID");
}
return null;
}
}
现在,在任何执行写操作的Service方法末尾,只需调用consistencyService.maintainConsistency(userId)即可。在接下来的30秒内,该用户的所有读请求(即使是@ReadOnly的)都会被强制路由到主库,从而解决了数据延迟问题。
Caddy Ingress配置
最后,Caddy作为集群的入口,负责TLS终止和将流量转发到我们的应用服务。它的配置极为简洁。如果使用caddy-ingress-controller,一个Ingress对象就足够了。
# clusters/production/apps/base/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: my-app-ingress
namespace: my-app
annotations:
caddy.ingress.kubernetes.io/hostname: "app.mydomain.com"
spec:
rules:
- host: "app.mydomain.com"
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: my-app-service
port:
number: 8080
Caddy会自动从Let’s Encrypt获取app.mydomain.com的TLS证书,并处理HTTPS。
架构的局限性与未来路径
此方案通过将路由逻辑置于应用层,并结合GitOps进行声明式管理,优雅地解决了读写分离架构中的一致性与配置管理难题。然而,它并非没有权衡。
该架构的复杂性主要体现在应用代码内部,对开发团队的规范和测试能力提出了更高要求。@ReadOnly注解的正确使用、事务边界的清晰划分,以及对会话一致性机制的理解,都需要成为团队的共识。
对于TiDB这种原生分布式数据库,其自身的follower read特性已经提供了一定程度的读扩展能力。本方案更适用于读/写负载极度不均衡,且对读延迟和数据一致性有苛刻要求的场景。
未来的优化路径可能包括:
- 动态配置热加载: 当前方案在
ConfigMap变更后需要Pod重启才能加载新配置。可以引入Spring Cloud Kubernetes Config,实现对ConfigMap变更的监听和数据源连接池的动态、平滑重建,达到零停机更新路由配置的目标。 - 更智能的负载均衡: 从库的轮询策略可以升级为基于响应时间或连接数的加权动态负载均衡算法,进一步优化读性能。
- 自动化一致性标记: 通过AOP切入所有非
SELECT的SQL执行(例如,使用MyBatis拦截器),自动为当前用户添加会话一致性标记,而不是在业务代码中手动调用,减少开发人员的心智负担。