行为型模式(Behavioral Patterns):这类模式主要关注对象之间的通信。它们 分别是:
观察者模式是一种行为设计模式
,允许对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖它的对象都会得到通知并自动更新。在这种模式中,发生状态改变的对象被称为“主题”(Subject),依赖它的对象被称为“观察者”(Observer)。所以观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern)。
被依赖的对象叫作被观察者(Observable),依赖的对象叫作观察者(Observer)。在实际的项目开发中,这两种对象的称呼是比较灵活的,有各种不同的叫法,比如:Subject-Observer、Publisher-Subscriber、Producer-Consumer等等。不管怎么称呼,只要应用场景符合刚刚给出的定义,都可以看作是观察者模式。
我们通过一个例子来实现观察者模式。假设我们有一个气象站(WeatherStation),需要向许多不同的显示设备(如手机App、网站、电子屏幕等)提供实时天气数据。
首先,我们需要创建一个Subject接口,表示被观察的主题:
/**
* 接口描述:被观察者应该提供注册、删除、通知观察者的能力
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/1 21:43
*/
public interface Subject {
/**
* 注册观察者
*
* @param observer
*/
void registerObserver(Observer observer);
/**
* 删除观察者
*
* @param observer
*/
void removeObserver(Observer observer);
/**
* 通知观察者
*/
void notifyObservers();
}
接下来,我们创建一个Observer接口,表示依赖主题的观察者接口:
/**
* 接口描述:观察者接口
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/1 21:50
*/
public interface Observer {
/**
* 更新新的天气气温
*
* @param temperature
*/
void update(float temperature);
}
创建一个具体的主题,如WeatherStation,实现Subject接口:
/**
* 类描述:气象站(被观察者)
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/1 21:49
*/
public class WeatherStation implements Subject {
// 温度
private float temperature;
// 所有依赖的观察者
private List<Observer> observers = new ArrayList<>();
// 修改温度
public void changeTemperature(float temperature) {
this.temperature = temperature;
// 通知所有观察者更新温度
notifyObservers();
}
@Override
public void registerObserver(Observer observer) {
observers.add(observer);
}
@Override
public void removeObserver(Observer observer) {
observers.remove(observer);
}
@Override
public void notifyObservers() {
// 通知所有观察者更新温度
for (Observer observer : observers) {
observer.update(temperature);
}
}
}
最后,我们创建具体的观察者并实现Observer接口,如AppClient、WebClient:
/**
* 类描述:手机客户端
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/1 21:55
*/
@Slf4j
public class AppClient implements Observer {
@Override
public void update(float temperature) {
log.info("app更新了温度,现在的温度是: {}", temperature);
}
}
/**
* 类描述:网页客户端
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/1 21:57
*/
@Slf4j
public class WebClient implements Observer {
@Override
public void update(float temperature) {
log.info("网页客户端更新了气温,温度是:{}", temperature);
}
}
测试用例:
/**
* 类描述:观察者设计模式测试案例
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/1 21:57
*/
public class ObserverTest {
@Test
public void test() {
// 定义气象站(主题-被观察者)
WeatherStation weatherStation = new WeatherStation();
// 定义观察者客户端(观察者)
Observer appClient = new AppClient();
Observer webClient = new WebClient();
// 注册观察者
weatherStation.registerObserver(appClient);
weatherStation.registerObserver(webClient);
// 更新温度
weatherStation.changeTemperature(20.05f);
}
}
测试结果:
[main] INFO cn.itcast.designPatterns.observer.weather.AppClient - app更新了温度,现在的温度是: 20.05
[main] INFO cn.itcast.designPatterns.observer.weather.WebClient - 网页客户端更新了气温,温度是:20.05
使用观察者模式的优点:
上面的小例子算是观察者模式的“模板代码”,可以反映该模式大体的设计思路。在真实的软件开发中,并不需要照搬上面的模板代码。观察者模式的实现方法各式各样,函数、类的命名等会根据业务场景的不同有很大的差别,比如 register 函数还可以叫作 attach,remove 函数还可以叫作 detach 等等。不过,万变不离其宗,设计思路都是差不多的。
了解了观察者设计模式的基本使用方式,我们接下来看看他的具体使用场景。
以下是一些使用观察者设计模式的例子:
股票行情应用:股票价格更新可以作为被观察者,投资者可以作为观察者。当股票价格发生变化时,所有订阅了该股票的投资者都会收到通知并更新自己的投资策略。
网络聊天室:聊天室服务器可以作为被观察者,用户可以作为察者。当有新消息时,聊天室服务器会通知所有在线用户更新聊天记录。
拍卖系统:拍卖系统可以作为被观察者,用户可以作为观察者。当出价发生变化时,所有关注该拍品的用户都会收到通知并更新自己的出价策略。
订阅系统:内容发布可以作为被观察者,用户可以作为观察者。当有新内容发布时,所有订阅了该内容的用户都会收到通知并获取最新内容。
电子邮件通知系统:任务状态更新可以作为被观察者,相关人员可以作为观察者。当任务状态发生变化时,所有关注该任务的人员都会收到通知并查看任务详情。
社交网络:被关注的用户可以作为被观察者,关注者可以作为观察者。当被关注的用户发布新动态时,所有关注者都会收到通知并查看动态。
发布-订阅模式和观察者模式都是用于实现对象间的松耦合通信的设计模式。尽管它们具有相似之处,但它们在实现方式和使用场景上存在一些关键区别。他们在概念上有一定的相似性,都是用于实现对象间的松耦合通信。可以将发布-订阅模式看作是观察者模式的一种变体或扩展。
观察者模式定义了一种一对多的依赖关系,当一个对象(被观察者)的状态发生变化时,所有依赖于它的对象(观察者)都会得到通知并自动更新。在这个模式中,被观察者和观察者之间存在直接的关联关系。观察者模式主要包括两类对象:被观察者(Subject)和观察者(Observer)。
发布-订阅模式(生产者和消费者)引入了第三方组件(通常称为消息代理或事件总线),该组件负责维护发布者和订阅者之间的关系。这意味着发布者和订阅者彼此不直接通信,而是通过消息代理进行通信。这种间接通信允许发布者和订阅者在运行时动态地添加或删除,从而提高了系统的灵活性和可扩展性。
Java中的发布-订阅模式示例:
// 订阅者接口
public interface Subscriber {
/**
* 接收事件通知
*
* @param event
*/
void onEvent(String event);
}
// 具体的订阅者
@Slf4j
public class ConcreteSubscriber implements Subscriber {
@Override
public void onEvent(String event) {
log.info("ConcreteSubscriber收到事件: {}", event);
}
}
// 具体的订阅者2
@Slf4j
public class ConcreteSubscriber2 implements Subscriber {
@Override
public void onEvent(String event) {
log.info("ConcreteSubscriber2收到事件: {}", event);
}
}
订阅者与事件发布之间通过消息总线(代理)来联系.
/**
* 类描述:消息总线(代理),实现主题(事件)发布,注册订阅者,删除订阅者等
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/3 22:17
*/
public class EventBus {
// 使用一个map维护,消息类型和该消息的订阅者
private Map<String, List<Subscriber>> subscribers = new HashMap<>();
/**
* 订阅消息
*
* @param eventType 事件类型
* @param subscriber 订阅者
*/
public void subscribe(String eventType, Subscriber subscriber) {
List<Subscriber> subs = subscribers.get(eventType);
if (subs == null) {
subs = new ArrayList<>();
}
subs.add(subscriber);
subscribers.put(eventType, subs);
}
/**
* 删除订阅
*
* @param eventType
* @param subscriber
*/
public void unSubscribe(String eventType, Subscriber subscriber) {
List<Subscriber> subs = subscribers.get(eventType);
if (subs != null) {
subs.remove(subscriber);
}
}
/**
* 发布事件消息
*
* @param eventType
* @param event
*/
public void publish(String eventType, String event) {
List<Subscriber> subs = subscribers.get(eventType);
if (subs != null) {
for (Subscriber sub : subs) {
sub.onEvent(event);
}
}
}
}
发布订阅的测试案例
@Test
public void test2() {
// 创建消息代理
EventBus eventBus = new EventBus();
// 创建订阅者
Subscriber subscriber = new ConcreteSubscriber();
Subscriber subscriber2 = new ConcreteSubscriber2();
// 订阅事件
String eventType = "eventA";
eventBus.subscribe(eventType, subscriber);
eventBus.subscribe(eventType, subscriber2);
// 发布事件
eventBus.publish(eventType, "这是事件A发布的消息");
log.info("===============================================");
// 取消订阅
eventBus.unSubscribe(eventType, subscriber2);
// 再次发布事件
eventBus.publish(eventType, "事件A又来发布的消息again");
}
测试结果:
[main] INFO cn.itcast.designPatterns.observer.subscriber.ConcreteSubscriber - ConcreteSubscriber收到事件: 这是事件A发布的消息
[main] INFO cn.itcast.designPatterns.observer.subscriber.ConcreteSubscriber2 - ConcreteSubscriber2收到事件: 这是事件A发布的消息
[main] INFO cn.itcast.designPattern.ObserverTest - ===============================================
[main] INFO cn.itcast.designPatterns.observer.subscriber.ConcreteSubscriber - ConcreteSubscriber收到事件: 事件A又来发布的消息again
总结一下两者的区别:
发布-订阅模式的优点:
解耦:在发布-订阅模式中,发布者和订阅者之间没有直接关联,它们通过一个中间组件(消息代理或事件总线)进行通信。这种间接通信可以使发布者和订阅者在运行时动态地添加或删除,从而进一步降低了它们之间的耦合度。
可扩展性:发布-订阅模式更容易向系统中添加新的发布者和订阅者,而无需修改现有的代码。这使得系统在不同组件之间通信时具有更好的可扩展性。
模块化:由于发布者和订阅者之间的通信通过中间组件进行,可以将系统划分为更小、更独立的模块。这有助于提高代码的可维护性和可读性。
异步通信:发布-订阅模式通常支持异步消息传递,这意味着发布者和订阅者可以在不同的线程或进程中运行。这有助于提高系统的并发性能和响应能力。
消息过滤:在发布-订阅模式中,可以利用中间组件对消息进行过滤,使得订阅者只接收到感兴趣的消息。这可以提高系统的性能,减少不必要的通信开销。
发布-订阅模式也有一些缺点,例如增加了系统的复杂性,因为引入了额外的中间组件。根据具体的应用场景和需求来选择合适的设计模式。
java.util.Observable类实现了主题(Subject)的功能,而java.util.Observer接口则定义了观察者(Observer)的方法。
通过调用Observable对象的notifyObservers()方法,可以通知所有注册的Observer对象,让它们更新自己的状态。
案例:假设有一个银行账户类,它的余额是可变的。当余额发生变化时,需要通知所有的观察者(比如说银行客户),以便它们更新自己的显示信息。
使用观察者模式来实现银行客户对自己账户余额的实时监控。
首先创建主题类,实现存款,取款,获取余额,注册订阅者,删除订阅者等
package cn.itcast.designPatterns.observer.jdkimpl;
import java.util.Observable;
/**
* 类描述:银行账户类,实现Observable类
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/9 21:37
*/
public class BankAccount extends Observable {
/**
* 余额
*/
private double balance;
public BankAccount(double balance) {
this.balance = balance;
}
/**
* 获取当前余额
*
* @return
*/
public double getBalance() {
return balance;
}
/**
* 存款操作
*
* @param amount
*/
public void deposit(double amount) {
balance += amount;
// 继承下来的方法,表示状态发生改变
setChanged();
// 继承下来的方法,通知所有观察者
notifyObservers();
}
/**
* 取款操作
*
* @param amount
*/
public void withdraw(double amount) {
balance -= amount;
setChanged();
notifyObservers();
}
}
再创建观察者类, 订阅者
/**
* 类描述:银行客户1-观察者
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/9 21:45
*/
@Slf4j
public class ClientOberver implements Observer {
@Override
public void update(Observable observable, Object arg) {
log.info("客户1查看余额已更新为: {}", ((BankAccount) observable).getBalance());
}
}
/**
* 类描述:银行客户2-观察者
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/9 21:45
*/
@Slf4j
public class ClientOberver2 implements Observer {
@Override
public void update(Observable observable, Object arg) {
log.info("客户2查看余额已更新为: {}", ((BankAccount) observable).getBalance());
}
}
测试用例:
/**
* 测试jdk中观察者模式的实现
*/
@Test
public void test3() {
// 创建发布事件(主题)对象
BankAccount bankAccount = new BankAccount(100.00);
log.info("银行账户余额是:{}", bankAccount.getBalance());
// 创建订阅者
ClientOberver clientOberver = new ClientOberver();
ClientOberver2 clientOberver2 = new ClientOberver2();
// 注册订阅者
bankAccount.addObserver(clientOberver);
bankAccount.addObserver(clientOberver2);
// 存款50
bankAccount.deposit(50.00);
log.info("================================");
// 取款25
bankAccount.withdraw(25.00);
log.info("================================");
// 删除订阅者
bankAccount.deleteObserver(clientOberver2);
log.info("================================");
// 取款20
bankAccount.withdraw(20.00);
}
测试结果:
[main] INFO cn.itcast.designPattern.ObserverTest - 银行账户余额是:100.0
[main] INFO cn.itcast.designPatterns.observer.jdkimpl.ClientOberver2 - 客户2查看余额已更新为: 150.0
[main] INFO cn.itcast.designPatterns.observer.jdkimpl.ClientOberver - 客户1查看余额已更新为: 150.0
[main] INFO cn.itcast.designPattern.ObserverTest - ================================
[main] INFO cn.itcast.designPatterns.observer.jdkimpl.ClientOberver2 - 客户2查看余额已更新为: 125.0
[main] INFO cn.itcast.designPatterns.observer.jdkimpl.ClientOberver - 客户1查看余额已更新为: 125.0
[main] INFO cn.itcast.designPattern.ObserverTest - ================================
[main] INFO cn.itcast.designPattern.ObserverTest - ================================
[main] INFO cn.itcast.designPatterns.observer.jdkimpl.ClientOberver - 客户1查看余额已更新为: 105.0
这个案例中,BankAccount类继承了java.util.Observable类,表示它是一个主题(Subject)。在存款或取款操作时,它会调用setChanged()方法表示状态已经改变,并调用notifyObservers()方法通知所有观察者(Observer)。
两个观察者(clientOberver和clientOberver2),它们分别实现了Observer接口的update()方法。当观察者收到更新通知时,它们会执行自己的业务逻辑,比如更新显示信息。
Guava 库中的 EventBus 类提供了一个简单的消息总线实现,可以帮助您在 Java应用程序中实现发布-订阅模式。以下是一个简单的示例,演示了如何使用 Guava 的EventBus 来实现一个简单的消息发布和订阅功能。
添加依赖项:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
定义一个事件消息类
/**
* 类描述:事件消息类
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/9 22:02
*/
public class MessageEvent {
private String message;
public MessageEvent(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
创建一个订阅者类。在订阅者类中,定义一个方法并使用 @Subscribe 注解标记该方法,以便 EventBus 能够识别该方法作为事件处理器:
/**
* 类描述:订阅者类
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/9 22:03
*/
@Slf4j
public class MessageSubscriber {
@Subscribe
public void handleMessageEvent(MessageEvent event) {
log.info("收到消息: {}", event.getMessage());
}
}
测试用例:
/**
* 测试guava的发布订阅实现
*/
@Test
public void test4() {
// 创建 EventBus 事件实例
com.google.common.eventbus.EventBus eventBus = new com.google.common.eventbus.EventBus();
// 创建并注册订阅者
MessageSubscriber messageSubscriber = new MessageSubscriber();
eventBus.register(messageSubscriber);
// 发布事件
eventBus.post(new MessageEvent("Hello, EventBus!"));
// 删除订阅者
eventBus.unregister(messageSubscriber);
// 再次发布事件(此时订阅者已取消注册,将不会收到消息)
eventBus.post(new MessageEvent("Another message"));
}
在这个示例中,我们创建了一个 EventBus 实例,然后创建并注册了一个MessageSubscriber 类型的订阅者。当我们使用 eventBus.post() 方法发布一个 MessageEvent 事件时,订阅者的 handleMessageEvent 方法将被调用,并输出收到的消息。
注意,如果订阅者处理事件的方法抛出异常, EventBus 默认情况下不会对异常进行处理。如果需要处理异常,可以在创建 EventBus 实例时传入一个自定义的SubscriberExceptionHandler。
public class EventBus {
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
}
public interface SubscriberExceptionHandler {
void handleException(Throwable var1, SubscriberExceptionContext var2);
}
之前讲到的实现方式,是一种同步阻塞的实现方式。观察者和被观察者代码在同一个线程内执行,被观察者一直阻塞,直到所有的观察者代码都执行完成之后,才执行后续的代码。
如果注册接口是一个调用比较频繁的接口,对性能非常敏感,希望接口的响应时间尽可能短,那我们可以将同步阻塞的实现方式改为异步非阻塞的实现方式,以此来减少响应时间。
创建主题接口及实现
/**
* 接口描述:主题类(发布事件等)
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/10 21:35
*/
public interface SyncObservable {
/**
* 注册观察者
*
* @param observer
*/
void addObserver(SyncObserver observer);
/**
* 移除观察者
*
* @param observer
*/
void removeObserver(SyncObserver observer);
/**
* 通知观察者
*
* @param message
*/
void notifyObservers(String message);
}
/**
* 类描述:具体的主题实现类(异步通知观察者)
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/10 21:37
*/
public class SyncObservableImpl implements SyncObservable {
private List<SyncObserver> observers;
private ExecutorService executorService;
public SyncObservableImpl() {
observers = new ArrayList<>();
executorService = Executors.newCachedThreadPool();
}
/**
* 消息更新后,通知所有观察者
*
* @param message
*/
public void setMessage(String message) {
notifyObservers(message);
}
@Override
public void addObserver(SyncObserver observer) {
observers.add(observer);
}
@Override
public void removeObserver(SyncObserver observer) {
observers.remove(observer);
}
@Override
public void notifyObservers(String message) {
for (SyncObserver observer : observers) {
// 异步执行
executorService.submit(() -> observer.update(message));
}
}
}
创建观察者接口及实现
/**
* 接口描述:观察者接口
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/10 21:34
*/
public interface SyncObserver {
void update(String message);
}
/**
* 类描述:
*
* @Author crysw
* @Version 1.0
* @Date 2024/1/10 21:36
*/
@Slf4j
public class SyncObserverImpl implements SyncObserver {
private String name;
public SyncObserverImpl(String name) {
this.name = name;
}
@Override
public void update(String message) {
log.info("{} received message: {}", name, message);
}
}
测试用例:
/**
* 测试异步通知
*/
@Test
public void test5() {
// 创建主题(发布事件)对象
SyncObservableImpl observable = new SyncObservableImpl();
// 创建观察者
SyncObserver observer = new SyncObserverImpl("crysw");
SyncObserver observer2 = new SyncObserverImpl("paanda");
// 注册观察者
observable.addObserver(observer);
observable.addObserver(observer2);
// 发布消息
observable.setMessage("放假了,放假了");
// 移除观察者
observable.removeObserver(observer);
// 再次更新消息
observable.setMessage("错了,继续搬砖");
}
不管是同步阻塞实现方式还是异步非阻塞实现方式,都是进程内的实现方式。如果用户注册成功之后,需要发送用户信息给大数据征信系统,而大数据征信系统是一个独立的系统,跟它之间的交互是跨不同进程的,那如何实现一个跨进程的观察者模式呢?
如果大数据征信系统提供了发送用户注册信息的 RPC 接口,我们仍然可以沿用之前的实现思路,在 notifyObservers() 函数中调用 RPC 接口来发送数据。但是,我们还有更加常用的一种实现方式,那就是基于消息队列(Message Queue)来实现。
当然,这种实现方式也有弊端,需要引入一个新的系统(消息队列),增加了维护成本。不过,它的好处也非常明显。在原来的实现方式中,观察者需要注册到被观察者中,被观察者需要依次遍历观察者来发送消息。而基于消息队列的实现方式,被观察者和观察者解耦更加彻底,两部分的耦合更小。被观察者完全不感知观察者,同理,观察者也完全不感知被观察者。被观察者只管发送消息到消息队列,观察者只管从消息队列中读取消息来执行相应的逻辑。