跟着deepseek浅学分布式事务(2) - 两阶段提交(2PC)
文章目录
- 一、核心角色
- 二、流程详解
- 三、关键示例
- 四、致命缺点
- 五、改进方案
- 六、适用场景
- 七、伪代码
- 1. 参与者(Participant)
- 2. 协调者(Coordinator)
- 3. 模拟运行(Main Class)
- 4. 关键问题模拟
- 八、待改进问题
- 总结
一、核心角色
- 协调者(Coordinator):事务的发起者,负责决策提交或回滚。
- 参与者(Participants):实际执行事务的资源管理器(如数据库、服务)。
二、流程详解
阶段一:准备阶段(Prepare Phase)
- 协调者向所有参与者发送
Prepare
请求,附带事务内容(例如:“扣除A账户100元”)。 - 参与者执行本地事务操作(但不提交),记录Undo/Redo日志(为回滚或重试做准备)。
- 参与者反馈结果:
- 同意:回复
Yes
(表示已准备好,保证能提交)。 - 拒绝:回复
No
(可能因约束冲突、网络超时等)。
- 同意:回复
阶段二:提交/回滚阶段(Commit/Rollback Phase)
- Case 1: 所有参与者回复
Yes
- 协调者发送
Commit
命令。 - 参与者完成本地提交,释放锁资源。
- 参与者回复
Ack
确认。 - 协调者在收到所有
Ack
后标记事务完成。
- 协调者发送
- Case 2: 任一参与者回复
No
或超时- 协调者发送
Rollback
命令。 - 参与者利用Undo日志回滚,释放资源。
- 参与者回复
Ack
。 - 协调者标记事务终止。
- 协调者发送
三、关键示例
场景:从账户A向账户B转账100元(A和B位于不同数据库)。
- 协调者:转账服务。
- 参与者:数据库A(扣款)、数据库B(存款)。
执行流程:
- Prepare阶段
- 协调者问数据库A:“能否扣除100元?”
- A检查余额≥100,锁定该行,记录日志,回复
Yes
。
- A检查余额≥100,锁定该行,记录日志,回复
- 协调者问数据库B:“能否增加100元?”
- B检查账户有效,记录日志,回复
Yes
。
- B检查账户有效,记录日志,回复
- 协调者问数据库A:“能否扣除100元?”
- Commit阶段
- 协调者收到所有
Yes
,发送Commit
。 - A提交扣除操作,B提交增加操作。
- 协调者收到所有
若任一数据库在Prepare阶段失败(如A余额不足):
- 协调者发送
Rollback
,A/B回滚操作,释放锁。
四、致命缺点
a) 同步阻塞(Blocking)
- 参与者在
Prepare
后进入阻塞状态,等待协调者指令。若协调者宕机,参与者资源被长期锁定(可能导致系统僵死)。
b) 单点故障(SPOF)
- 协调者宕机时:
- 若在Prepare阶段:参与者无法得知最终决定,只能等待(或超时后自行回滚,破坏一致性)。
- 若在Commit阶段:部分参与者可能已提交,导致数据不一致。
c) 数据不一致风险
- 网络分区时:协调者仅收到部分
Yes
,可能误判为全部同意(需依赖超时机制补救)。 - 部分Commit失败:某些参与者收到
Commit
后崩溃,未执行提交。
五、改进方案
- 超时机制:参与者等待超时后自动回滚(但无法完全避免不一致)。
- 三阶段提交(3PC):引入
CanCommit
阶段和超时中断,减少阻塞时间(仍非完美)。 - 替代方案:TCC、SAGA等通过业务层补偿解决2PC的痛点。
六、适用场景
- 强一致性需求:如金融核心交易(容忍低性能,追求严格一致)。
- 短事务:长时间阻塞会加剧问题。
七、伪代码
1. 参与者(Participant)
public class Participant {private String id;private boolean prepared = false;private boolean committed = false;private Map<String, Integer> data; // 模拟数据库(账户余额)public Participant(String id) {this.id = id;this.data = new HashMap<>();this.data.put("balance", 100); // 初始余额100}/*** 准备阶段(Phase 1)** @param txId 事务ID* @param amount 操作金额(正数存款,负数扣款)* @return true/false 表示是否准备成功*/public boolean prepare(String txId, int amount) {try {// 检查业务约束(例如余额是否足够)if (data.get("balance") + amount >= 0) {prepared = true; // 锁定资源,记录undo日志(伪代码)System.out.println("[Participant " + id + "] Prepared for TX " + txId);return true;} else {return false;}} catch (Exception e) {return false; // 模拟失败(如超时、异常)}}/*** 提交阶段(Phase 2)*/public boolean commit(String txId, int amount) {if (!prepared) {return false; // 未准备成功,拒绝提交}data.put("balance", data.get("balance") + amount); // 执行提交committed = true;System.out.println("[Participant " + id + "] Committed TX " + txId);return true;}/*** 回滚阶段(Phase 2)*/public boolean rollback(String txId) {if (prepared && !committed) {prepared = false; // 释放资源(伪代码)System.out.println("[Participant " + id + "] Rolled back TX " + txId);return true;}return false;}public int getBalance() {return data.get("balance");}}
2. 协调者(Coordinator)
public class Coordinator {private List<Participant> participants = new ArrayList<>();public void addParticipant(Participant participant) {participants.add(participant);}/*** 执行两阶段提交** @param txId 事务ID* @param operations 参与者操作映射* @return true/false 表示事务是否成功*/public boolean execute2PC(String txId, Map<Participant, Integer> operations) {// ----------- Phase 1: Prepare -----------List<Participant> preparedParticipants = new ArrayList<>();for (Map.Entry<Participant, Integer> entry : operations.entrySet()) {Participant p = entry.getKey();int amount = entry.getValue();if (!p.prepare(txId, amount)) {rollbackAll(txId, preparedParticipants);return false;}preparedParticipants.add(p);}// ----------- Phase 2: Commit -----------for (Map.Entry<Participant, Integer> entry : operations.entrySet()) {Participant p = entry.getKey();int amount = entry.getValue();if (!p.commit(txId, amount)) {System.out.println("[ERROR] Partial commit failed!");return false; // 部分提交失败(需人工干预)}}System.out.println("[Coordinator] TX " + txId + " successfully committed!");return true;}/*** 回滚所有已准备的参与者*/private void rollbackAll(String txId, List<Participant> participants) {for (Participant p : participants) {p.rollback(txId);}}}
3. 模拟运行(Main Class)
public class Main {public static void main(String[] args) {// 初始化协调者和参与者(模拟两个数据库)Coordinator coordinator = new Coordinator();Participant dbA = new Participant("DB-A"); // 初始余额100Participant dbB = new Participant("DB-B"); // 初始余额100coordinator.addParticipant(dbA);coordinator.addParticipant(dbB);// 模拟转账:A向B转账50(A扣50,B加50)Map<Participant, Integer> map = new HashMap<>();map.put(dbA, -50); // A扣50map.put(dbB, 50); // B加50boolean success = coordinator.execute2PC("TX-001", map);if (success) {System.out.println("Transaction succeeded!");System.out.println("A余额:" + dbA.getBalance());System.out.println("B余额:" + dbB.getBalance());} else {System.out.println("Transaction failed!");}}}
4. 关键问题模拟
// 1. 全部成功
[Participant DB-A] Prepared for TX TX-001
[Participant DB-B] Prepared for TX TX-001
[Participant DB-A] Committed TX TX-001
[Participant DB-B] Committed TX TX-001
[Coordinator] TX TX-001 successfully committed!
Transaction succeeded!
// 2. Prepare阶段失败(DB-B余额不足)
// 修改DB-B的初始余额为0
Participant dbB = new Participant("DB-B");
dbB.data.put("balance", 0); // 余额不足,无法扣款[Participant DB-A] Prepared for TX TX-001
[Coordinator] Participant failed to prepare. Aborting TX TX-001
[Participant DB-A] Rolled back TX TX-001
Transaction failed!
八、待改进问题
- 协调者单点故障:需通过日志持久化 + 备用协调者恢复。
- 网络超时:添加超时机制,避免无限阻塞。
- 幂等性:参与者需支持重复提交/回滚(相同事务ID)。
- 锁优化:长时间Prepare会阻塞其他事务,需优化锁粒度。
总结
这些资料整理自DeepSeek,如有版权问题请联系我进行修改。