事件驱动架构详解
序言
大家好,我是程序员木然, 这一章我将会带大家做一个新的轮子项目: 基于信号的事件驱动框架。
接下来,先介绍一些基础概念吧:
事件驱动架构(EDA)
事件驱动架构是一种软件设计架构,应用程序的行为由各种事件的发生和处理决定。在这种架构下,系统通过监听和响应事件来驱动执行流程。事件可以是用户的操作、外部系统的输入或定时器的触发等。
事件驱动架构的关键特性在于“解耦”,即事件的发起者和事件的处理者是独立的,彼此之间并不直接通信,而是通过一个中央机制来调度事件和响应。这种解耦使得系统能够更灵活地扩展和维护。
啊~ 差不多就是就是这样,相信不少的同学都会很懵 ———— 这讲的啥啊?啊?啥啊?
OK啊,深呼吸,懵逼是正常的。 这个概念对于没有接触过的同学来说,直接看概念是很抽象的,那么为了大家能够更好的理解,我来给大家讲一下这个架构的演变过程吧。
要谈这个架构,我们就得先说说他的初始核心 ———— 观察者模式 (很多的架构或者玩法都是由设计模式一步一步演变过来的, 不愧是前辈们总结的经验)
那么我们就先聊聊观察者模式吧(之前没有学过观察者模式的跪求不要划走,我下面马上让大家零基础入门观察者模式)。
简单的来说:
观察者模式: 是设计模式中的一种行为型模式,主要用于定义对象之间的一种 一对多依赖关系。当一个对象(被观察者/主题)发生状态变化时,它会自动通知所有依赖它的对象(观察者),让它们能够及时更新,从而保持数据或行为的一致性。
复杂的来说:
你有没有试过在群聊里说了句:“我失恋了”,然后几十个朋友瞬间弹出:“怎么回事?”、“别伤心啦~”、“要不喝点?”——
那么!这就叫:观察者模式!
给大家一个形象的比喻吧:
概念 | 现实类比 | Java 对应 |
---|---|---|
被观察者 Subject | 八卦群管理员 | Order 、 Subject 类 |
观察者 Observer | 吃瓜群众 | InventoryManager 类 |
事件 Event | 八卦内容 | 通知、方法调用 |
相信大家有一定的理解了,那么我们接下来还是直接上代码
核心思想
-
被观察者(Subject/Observable):维护一组观察者,当自身状态发生变化时,负责通知这些观察者。
-
观察者(Observer):订阅被观察者的变化,一旦接收到通知,就会执行相应的操作。
-
解耦:被观察者只需维护观察者的列表,并在状态变化时通知它们,而不需要关心观察者的具体实现逻辑。
代码示例
被观察者 :
/*** 被观察者: 重活一世的舔狗*/
public class Subject {
}
抽象观察者:
/*** 抽象观察者*/
public interface Observer {void receiveMessage(String message);
}
观察者1号 :
/*** 观察者: 好兄弟*/
public class BroObserver implements Observer{@Overridepublic void receiveMessage(String message) {System.out.println("兄弟说: 哟哟哟,薄冰哥~");}
}
观察者2号:
/*** 观察者: 坏女孩*/
public class BadGirl implements Observer{@Overridepublic void receiveMessage(String message) {System.out.println("女神: 这舔狗怎么回事");}
}
观察者3号:
/*** 观察者: 好女孩*/
public class GoodGirl implements Observer {@Overridepublic void receiveMessage(String message) {System.out.println("好女孩: 他咋啦啊");}
}
小舔狗:发个朋友圈看看
/*** 被观察者: 重活一世的舔狗*/
public class Subject {static List<Observer> observers = new ArrayList<>();public static void main(String[] args) {// 好兄弟开始观察小舔狗observers.add(new BroObserver());// 好女孩开始观察小舔狗observers.add(new GoodGirl());// 坏女孩开始观察小舔狗observers.add(new BadGirl());String message = "放下了,我这一生,如履薄冰";System.out.println("小舔狗发送了一条朋友圈: " + message );for (Observer observer : observers) {observer.receiveMessage(message);}}
}
运行结果:
小舔狗发送了一条朋友圈: 放下了,我这一生,如履薄冰
兄弟,你想开了,出去喝一杯
他终于醒悟了
欲擒故纵吗?有点意思
那么,这个案例应该很明显的展示观察者模式的玩法了。
从观察者模式到事件驱动架构
通过上面的例子,我们知道 观察者模式 的核心就是:
-
被观察者负责发消息
-
观察者负责接收并响应
这种设计的好处是解耦:小舔狗发朋友圈的时候,并不需要关心“谁”在看、“看了之后会做什么”,只需要把信息广播出去就好。
但是,随着系统越来越复杂,单纯的观察者模式就显得 捉襟见肘 了。
观察者模式的局限性
-
通知逻辑写死
在上面的例子里,小舔狗必须手动维护 observers 列表并遍历通知,这种方式比较硬编码。
如果观察者数量很多、或者不同事件对应不同的观察者,就会很难管理。 -
缺少事件抽象
我们的朋友圈消息本质上就是一个事件,但在观察者模式里,它就是个字符串,没有更强的语义和约束。 -
同步阻塞
目前的通知方式是同步的:小舔狗一发朋友圈,就必须等所有观察者都回复完才能继续执行。现实世界往往需要 异步解耦(比如发朋友圈后,你不会等朋友一个个回复完才能去睡觉, 当然除了等待女神点赞的各位🤡)。
事件驱动架构的引入
为了解决以上问题,事件驱动架构(EDA) 作为观察者模式的自然演进出现了。它在“发布-订阅”的基础上,加入了 事件总线(Event Bus) 或 信号机制(Signal),从而让系统具备更强的解耦性和扩展性。
- 事件(Event/Signal)
更加明确的抽象,不只是“字符串”,而是一个携带上下文的数据对象。
- 事件总线(Event Bus / Dispatcher)
被观察者不用维护一堆观察者列表,而是把“事件”丢到总线上,由总线统一分发给感兴趣的观察者。
- 异步机制
事件触发后,观察者可以异步处理,不会阻塞被观察者。
类比理解
如果说 观察者模式 就像“小舔狗建了个微信群,大家都在群里看他的动态”,
那么 事件驱动架构 就像“朋友圈系统”:
-
小舔狗只管发动态(事件),
-
微信后台(事件总线)会根据规则推送给相关好友(订阅者),
-
好友什么时候看、怎么看、要不要回复,完全解耦。
过渡总结
所以,我们可以认为:
-
观察者模式 是最简单的事件驱动思想实现。
-
事件驱动架构 是它的 进阶和扩展,解决了通知逻辑混乱、同步阻塞、缺乏抽象等问题。
接下来,我们就要一步一步从观察者模式的代码,逐渐演化出一个 基于信号的事件驱动框架。
接下来,升级为事件驱动!
1 事件与监听器定义
// 事件标记接口
public interface Event { }// 事件监听器(泛型约束具体事件类型)
public interface EventListener<E extends Event> {void onEvent(E event);
}
import java.time.LocalDateTime;// 具体事件:发朋友圈
public class MomentPosted implements Event {private final String author;private final String message;private final LocalDateTime time;public MomentPosted(String author, String message) {this.author = author;this.message = message;this.time = LocalDateTime.now();}public String getAuthor() { return author; }public String getMessage() { return message; }public LocalDateTime getTime() { return time; }
}
2 事件总线(支持同步/异步分发)
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;public class EventBus {private final Map<Class<? extends Event>, CopyOnWriteArrayList<EventListener<? extends Event>>> listeners = new ConcurrentHashMap<>();private final AtomicBoolean async = new AtomicBoolean(false);private ExecutorService executor;public EventBus() { }// 开启/关闭异步分发public void setAsync(boolean enable) {async.set(enable);if (enable && executor == null) executor = Executors.newCachedThreadPool();if (!enable && executor != null) {executor.shutdown();executor = null;}}// 订阅public <E extends Event> void subscribe(Class<E> type, EventListener<E> listener) {listeners.computeIfAbsent(type, k -> new CopyOnWriteArrayList<>()).add(listener);}// 取消订阅public <E extends Event> void unsubscribe(Class<E> type, EventListener<E> listener) {List<? extends EventListener<? extends Event>> list = listeners.get(type);if (list != null) list.remove(listener);}// 发布事件@SuppressWarnings("unchecked")public <E extends Event> void publish(E event) {List<EventListener<? extends Event>> list = listeners.getOrDefault(event.getClass(), new CopyOnWriteArrayList<>());if (list.isEmpty()) return;for (EventListener<? extends Event> l : list) {EventListener<E> listener = (EventListener<E>) l;if (async.get() && executor != null) {executor.submit(() -> listener.onEvent(event));} else {listener.onEvent(event);}}}
}
3 三位“好友”成为订阅者(观察者)
public class BroObserver implements EventListener<MomentPosted> {@Overridepublic void onEvent(MomentPosted event) {System.out.println("好兄弟:" + event.getAuthor() + " 想开了?今晚我请!");}
}public class BadGirl implements EventListener<MomentPosted> {@Overridepublic void onEvent(MomentPosted event) {System.out.println("坏女孩:这波是欲擒故纵吗?有点意思~");}
}public class GoodGirl implements EventListener<MomentPosted> {@Overridepublic void onEvent(MomentPosted event) {System.out.println("好女孩:别难过,我在呢~");}
}
4. 小舔狗只负责“发动态”(发布事件)
public class MomentsDemo {public static void main(String[] args) throws InterruptedException {EventBus bus = new EventBus();// 切换成异步分发试试(不需要就注释掉)bus.setAsync(true);// 订阅“发朋友圈”事件bus.subscribe(MomentPosted.class, new BroObserver());bus.subscribe(MomentPosted.class, new GoodGirl());bus.subscribe(MomentPosted.class, new BadGirl());// 小舔狗发布动态String msg = "放下了,我这一生,如履薄冰";System.out.println("小舔狗发布朋友圈:「" + msg + "」");bus.publish(new MomentPosted("薄冰哥", msg));// 演示异步时,给线程池一点时间输出(同步模式可去掉)Thread.sleep(200);}
}
可能输出(异步下顺序不定,同步下按订阅顺序):
小舔狗发布朋友圈:「放下了,我这一生,如履薄冰」
好女孩:别难过,我在呢~
坏女孩:这波是欲擒故纵吗?有点意思~
好兄弟:薄冰哥 想开了?今晚我请!
小结一下吧
特性 | 观察者模式 | 事件驱动架构 |
---|---|---|
通知机制 | 主动维护观察者列表 | 交给事件总线分发 |
事件模型 | 通常是方法调用/字符串 | 语义化的事件对象 |
执行方式 | 同步阻塞 | 支持异步解耦 |
适用场景 | 小型、单进程内交互 | 大型、跨服务、异步系统 |
EDA典型应用场景
-
电商系统
下单 → 触发 OrderCreated 事件 → 库存服务扣减库存、支付服务生成支付单、消息服务推送通知。
→ 好处:下单服务只负责发事件,不关心“库存、支付、消息”具体怎么处理。 -
游戏开发
玩家升级 → 发布 LevelUp 事件 → 背包系统加奖励、排行榜更新分数、成就系统点亮徽章。 -
前端/客户端
JS 的 addEventListener、Android 的 BroadcastReceiver,其实都是事件驱动的变体。
技术总结
那么,大家开发中实现事件驱动的技术有哪些呢?
本地级别:Guava EventBus、Spring ApplicationEvent。
分布式级别:Kafka、RabbitMQ、Pulsar、RocketMQ。
云原生级别:Knative Eventing、AWS EventBridge。
经过这一章,大家应该能理解到事件驱动的操作了,那么这一章圆满结束。
课后思考:
-
- 幂等性:事件可能重复消费,要保证处理一次和处理多次结果一致
-
- 有的场景必须保证先后,比如“先扣库存,再发货”。
-
- 事件如果丢了怎么办?要不要补偿机制?
更多资料,请关注Code百分百