在一个典型的移动应用驱动的微服务架构中,我们很快就会遇到一个棘手的问题:如何维护跨多个服务的数据一致性。假设我们正在构建一个体验预订平台,用户通过Flutter应用预订活动。这个操作至少会触及两个服务:BookingService,负责将预订记录写入主数据库(如PostgreSQL);以及SearchService,负责更新Elasticsearch索引,以便其他用户能看到最新的可预订名额。
如果BookingService成功写入数据库,但在调用SearchService时网络发生抖动或SearchService暂时不可用,会发生什么?主数据和搜索数据立刻出现不一致。用户可能会在搜索结果中看到一个已经被预订的名额,导致糟糕的用户体验。传统的两阶段提交(2PC)在微服务架构中因其同步阻塞和对资源的高锁定性而基本不被采用。这正是Saga模式的用武之地。
我们最初的构想是在BookingService中加入重试逻辑,并在失败后记录日志,由人工或定时任务补偿。但这种方式非常脆弱。如果BookingService在写入数据库后、调用SearchService前发生崩溃,这次更新就永远丢失了。我们需要一个更可靠、可追踪的机制。
最终,我们决定采用编排式Saga(Orchestration-based Saga)。它引入了一个中心化的协调器——SagaOrchestrator,来管理整个分布式事务的流程。这种方式相比于编舞式Saga(Choreography-based Saga),其状态和失败逻辑都集中管理,对于调试和理解复杂流程更为直观。在真实项目中,清晰的故障排查路径往往比极致的解耦更有价值。
架构设计与流程
我们的目标是实现一个从Flutter客户端发起的预订流程,该流程能原子性地完成数据库写入和搜索引擎索引更新。
sequenceDiagram
participant FlutterApp as Flutter客户端
participant Gateway as API网关
participant Orchestrator as Saga编排器
participant BookingSvc as 预订服务 (Postgres)
participant SearchSvc as 搜索服务 (Elasticsearch)
FlutterApp->>+Gateway: POST /saga/bookings (发起预订)
Gateway->>+Orchestrator: startBookingSaga(bookingDetails)
Orchestrator->>Orchestrator: 创建Saga实例, state=PENDING
Orchestrator->>+BookingSvc: POST /bookings (创建预订)
BookingSvc->>-Orchestrator: 201 Created (bookingId)
Orchestrator->>Orchestrator: Saga state=BOOKING_CREATED, 记录bookingId
Orchestrator->>+SearchSvc: POST /indexes (更新索引)
Note right of SearchSvc: 假设此处调用失败
SearchSvc-->>-Orchestrator: 500 Internal Server Error
Orchestrator->>Orchestrator: Saga state=COMPENSATING
Orchestrator->>+BookingSvc: DELETE /bookings/{bookingId} (补偿操作)
BookingSvc->>-Orchestrator: 204 No Content
Orchestrator->>Orchestrator: Saga state=FAILED
Orchestrator-->>-Gateway: 202 Accepted (sagaId, status=FAILED)
Gateway-->>-FlutterApp: 返回Saga状态
这个流程图清晰地展示了失败路径。当SearchService调用失败时,编排器会启动补偿逻辑,调用BookingService的删除接口来回滚已经完成的操作,从而保证系统的最终一致性。
核心实现:Saga编排器
我们使用Node.js和TypeScript构建一个简单的内存Saga编排器。在生产环境中,Saga的状态必须持久化到数据库(如Redis或Postgres)中,以防编排器自身崩溃导致状态丢失。
saga.orchestrator.ts
import { randomUUID } from 'crypto';
import axios from 'axios';
// 在真实项目中,这个状态会存储在Redis或数据库中
const sagaStore: Map<string, SagaInstance> = new Map();
// 服务地址配置,从环境变量读取
const BOOKING_SERVICE_URL = process.env.BOOKING_SERVICE_URL || 'http://localhost:3001';
const SEARCH_SERVICE_URL = process.env.SEARCH_SERVICE_URL || 'http://localhost:3002';
enum SagaState {
PENDING,
CREATING_BOOKING,
UPDATING_INDEX,
COMPENSATING_BOOKING,
SUCCEEDED,
FAILED,
}
interface SagaContext {
bookingDetails: any;
bookingId?: string;
error?: string;
}
interface SagaInstance {
id: string;
state: SagaState;
context: SagaContext;
}
// 定义Saga的步骤
const sagaDefinition = {
[SagaState.CREATING_BOOKING]: {
async execute(context: SagaContext) {
console.log('[SAGA] Executing: Create Booking');
const response = await axios.post(`${BOOKING_SERVICE_URL}/bookings`, context.bookingDetails);
return { bookingId: response.data.id };
},
onSuccess: SagaState.UPDATING_INDEX,
onFailure: SagaState.FAILED, // 第一步失败,直接结束
},
[SagaState.UPDATING_INDEX]: {
async execute(context: SagaContext) {
console.log('[SAGA] Executing: Update Search Index');
// 这里的 payload 应该更复杂,包含需要索引的数据
await axios.post(`${SEARCH_SERVICE_URL}/indexes`, {
bookingId: context.bookingId,
...context.bookingDetails,
});
return {};
},
onSuccess: SagaState.SUCCEEDED,
onFailure: SagaState.COMPENSATING_BOOKING, // 失败时触发补偿
},
[SagaState.COMPENSATING_BOOKING]: {
async execute(context: SagaContext) {
console.log(`[SAGA] Compensating: Delete Booking ${context.bookingId}`);
if (!context.bookingId) {
throw new Error('Cannot compensate booking without bookingId');
}
await axios.delete(`${BOOKING_SERVICE_URL}/bookings/${context.bookingId}`);
return {};
},
onSuccess: SagaState.FAILED, // 补偿成功后,Saga最终状态为失败
onFailure: SagaState.FAILED, // 补偿失败需要告警和人工介入
},
};
class SagaOrchestrator {
public start(bookingDetails: any): SagaInstance {
const sagaId = randomUUID();
const instance: SagaInstance = {
id: sagaId,
state: SagaState.PENDING,
context: { bookingDetails },
};
sagaStore.set(sagaId, instance);
console.log(`[SAGA] Started Saga with ID: ${sagaId}`);
this.transition(sagaId, SagaState.CREATING_BOOKING);
return instance;
}
public getStatus(sagaId: string): SagaInstance | undefined {
return sagaStore.get(sagaId);
}
private async transition(sagaId: string, nextState: SagaState) {
const instance = sagaStore.get(sagaId);
if (!instance) {
console.error(`[SAGA] Saga instance not found: ${sagaId}`);
return;
}
instance.state = nextState;
sagaStore.set(sagaId, instance);
console.log(`[SAGA] Transitioned ${sagaId} to state: ${SagaState[nextState]}`);
const step = sagaDefinition[nextState];
if (step) {
try {
const result = await step.execute(instance.context);
// 更新上下文,为后续步骤提供数据
instance.context = { ...instance.context, ...result };
this.transition(sagaId, step.onSuccess);
} catch (error: any) {
console.error(`[SAGA] Step ${SagaState[nextState]} failed for ${sagaId}:`, error.message);
instance.context.error = error.message;
// 这里的坑在于:如果补偿操作本身也失败了,系统将处于一个不一致的“脏”状态。
// 生产级系统需要为补偿操作设计重试和告警机制。
this.transition(sagaId, step.onFailure);
}
}
}
}
export const orchestrator = new SagaOrchestrator();
这份代码实现了一个简单的状态机。transition函数是核心,它驱动Saga从一个状态转移到下一个。每个步骤定义了执行体(execute)、成功转移状态(onSuccess)和失败转移状态(onFailure)。注意,UPDATING_INDEX步骤失败后会转移到COMPENSATING_BOOKING,这就是Saga的回滚逻辑。
参与者服务:保证幂等性
Saga编排器可能会因为网络问题重试对参与者服务的调用。因此,参与者服务的接口必须是幂等的。
booking.service.ts
// 使用 Express.js 模拟 BookingService
import express from 'express';
const app = express();
app.use(express.json());
// 模拟数据库
const bookingsDB = new Map<string, any>();
// POST /bookings
// 一个常见的错误是没有处理重复请求。
// 这里的实现是简化的,真实项目需要基于请求ID或业务唯一标识来确保幂等。
app.post('/bookings', (req, res) => {
const bookingId = `bk_${Date.now()}`;
console.log(`[BookingService] Creating booking ${bookingId}`);
bookingsDB.set(bookingId, req.body);
res.status(201).json({ id: bookingId, status: 'created' });
});
// DELETE /bookings/:id
// 删除操作天然具有幂等性,删除一个不存在的资源不应该报错。
app.delete('/bookings/:id', (req, res) => {
const { id } = req.params;
if (bookingsDB.has(id)) {
console.log(`[BookingService] Deleting booking ${id}`);
bookingsDB.delete(id);
} else {
// 即使资源不存在,也应该返回成功,这是幂等性的关键。
console.log(`[BookingService] Booking ${id} not found, compensation considered successful.`);
}
res.status(204).send();
});
app.listen(3001, () => console.log('BookingService listening on port 3001'));
SearchService的实现类似,但为了测试失败场景,我们让它有一定概率失败。
search.service.ts
import express from 'express';
const app = express();
app.use(express.json());
// 模拟Elasticsearch索引
const searchIndex = new Map<string, any>();
app.post('/indexes', (req, res) => {
// 模拟服务不稳定的情况
if (Math.random() < 0.5) {
console.error('[SearchService] Failed to update index due to a transient error.');
return res.status(500).json({ error: 'Failed to connect to search cluster' });
}
const { bookingId } = req.body;
console.log(`[SearchService] Indexing document for booking ${bookingId}`);
searchIndex.set(bookingId, req.body);
res.status(200).json({ status: 'indexed' });
});
app.listen(3002, () => console.log('SearchService listening on port 3002'));
Flutter客户端:处理异步事务状态
Saga本质上是异步的。客户端发起一个Saga后,不能同步等待结果,因为整个流程可能需要几秒钟甚至更长时间。客户端的角色是:
- 发起Saga,并获得一个唯一的
sagaId。 - 使用
sagaId轮询编排器,获取Saga的最终状态。 - 根据状态更新UI,向用户提供明确的反馈。
我们使用Riverpod进行状态管理。
saga_service.dart
import 'dart:async';
import 'package:dio/dio.dart';
// 你的API网关或编排器地址
const String _baseUrl = 'http://10.0.2.2:3000';
enum SagaStatus { pending, succeeded, failed, unknown }
class SagaService {
final Dio _dio = Dio();
// 1. 发起Saga
Future<String> startBookingSaga(Map<String, dynamic> bookingDetails) async {
try {
final response = await _dio.post('$_baseUrl/saga/bookings', data: bookingDetails);
if (response.statusCode == 202 && response.data['sagaId'] != null) {
return response.data['sagaId'];
}
throw Exception('Failed to start Saga: Invalid response');
} catch (e) {
// 这里的错误处理至关重要,如果Saga启动失败,需要立即通知用户。
print('Error starting saga: $e');
rethrow;
}
}
// 2. 查询Saga状态
Future<SagaStatus> getSagaStatus(String sagaId) async {
try {
final response = await _dio.get('$_baseUrl/saga/status/$sagaId');
final statusString = response.data['status'];
switch (statusString) {
case 'SUCCEEDED':
return SagaStatus.succeeded;
case 'FAILED':
return SagaStatus.failed;
default:
return SagaStatus.pending;
}
} catch (e) {
print('Error getting saga status: $e');
return SagaStatus.unknown;
}
}
}
booking_provider.dart
import 'dart:async';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'saga_service.dart'; // 假设SagaService在另一个文件
// 提供SagaService实例
final sagaServiceProvider = Provider((ref) => SagaService());
// 用于追踪特定Saga状态的Family Provider
final sagaStatusProvider = FutureProvider.family<SagaStatus, String>((ref, sagaId) async {
final sagaService = ref.watch(sagaServiceProvider);
// 使用轮询来检查状态。在生产应用中,WebSocket或SSE是更好的选择。
while (true) {
final status = await sagaService.getSagaStatus(sagaId);
if (status == SagaStatus.succeeded || status == SagaStatus.failed) {
return status;
}
// 等待2秒再查询,避免过于频繁的请求
await Future.delayed(const Duration(seconds: 2));
}
});
// 预订页面的状态
class BookingNotifier extends StateNotifier<AsyncValue<String?>> {
BookingNotifier(this.ref) : super(const AsyncValue.data(null));
final Ref ref;
Future<void> createBooking(Map<String, dynamic> details) async {
state = const AsyncValue.loading();
try {
final sagaService = ref.read(sagaServiceProvider);
final sagaId = await sagaService.startBookingSaga(details);
state = AsyncValue.data(sagaId);
} catch (e, st) {
state = AsyncValue.error(e, st);
}
}
}
final bookingNotifierProvider = StateNotifierProvider<BookingNotifier, AsyncValue<String?>>(
(ref) => BookingNotifier(ref),
);
booking_screen.dart (UI部分)
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'booking_provider.dart';
import 'saga_service.dart';
class BookingScreen extends ConsumerWidget {
const BookingScreen({super.key});
Widget build(BuildContext context, WidgetRef ref) {
// 监听预订操作的发起状态
ref.listen<AsyncValue<String?>>(bookingNotifierProvider, (_, state) {
state.whenOrNull(
error: (err, st) => ScaffoldMessenger.of(context).showSnackBar(
SnackBar(content: Text('Failed to initiate booking: $err')),
),
);
});
final bookingState = ref.watch(bookingNotifierProvider);
final sagaId = bookingState.value;
return Scaffold(
appBar: AppBar(title: const Text('Saga Pattern in Flutter')),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
if (sagaId == null)
ElevatedButton(
onPressed: bookingState.isLoading
? null
: () {
// 模拟预订数据
final bookingDetails = {'userId': 'user123', 'activityId': 'act456'};
ref.read(bookingNotifierProvider.notifier).createBooking(bookingDetails);
},
child: const Text('Create Booking'),
)
else
// 一旦Saga启动,我们就监听它的最终状态
Consumer(builder: (context, ref, _) {
final sagaStatus = ref.watch(sagaStatusProvider(sagaId));
return sagaStatus.when(
data: (status) {
if (status == SagaStatus.succeeded) {
return const Column(children: [Icon(Icons.check_circle, color: Colors.green, size: 48), Text('Booking Successful!')]);
}
if (status == SagaStatus.failed) {
return const Column(children: [Icon(Icons.error, color: Colors.red, size: 48), Text('Booking Failed. Please try again.')]);
}
// 这段逻辑不应该被触及,因为provider会持续运行直到成功或失败
return const Text('Unexpected Status');
},
loading: () => const Column(children: [CircularProgressIndicator(), SizedBox(height: 16), Text('Processing booking...')]),
error: (err, st) => Text('Error polling status: $err'),
);
}),
],
),
),
);
}
}
通过FutureProvider.family,我们可以为每个sagaId创建一个独立的、可缓存的状态流。UI组件订阅这个Provider,便能响应式地展示”处理中”、”成功”或”失败”的状态,即使用户离开再返回这个页面(只要sagaId被持久化),也能看到正确的最终结果。
方案局限与未来路径
当前这套实现虽然验证了核心思想,但在生产环境中仍存在不足。首先,内存中的Saga编排器是单点故障,且无法水平扩展。一个健壮的实现需要将Saga实例的状态持久化到高可用的数据库中,并确保状态转换的事务性。
其次,Flutter客户端使用轮询来获取Saga状态,这会产生不必要的网络流量并带来延迟。在真实项目中,应采用WebSocket或Server-Sent Events (SSE) 等服务端推送技术,由编排器在Saga终态时主动通知客户端,以获得更好的实时性和效率。
最后,补偿逻辑的可靠性是Saga模式的基石。如果补偿操作自身失败(例如,BookingService的数据库宕机),整个系统将陷入不一致状态。因此,生产级的Saga实现必须对补偿操作设计重试机制,并在多次失败后触发明确的告警,以便人工介入处理。对于一些关键业务,甚至可能需要引入更复杂的策略,如记录失败的补偿任务到死信队列中进行后续处理。