? ? EDA(Event-Driven Architecture)是一种实现组件之间松耦合、易扩展的架构方式。一个最简单的EDA设计需要包含如下几个组件:
Events:需要被处理的数据。一个Event至少包含两个属性,类型和数据,类型决定了Events被哪个Handler处理,数据是Handler中代加工的材料。
Event Handlers:处理Events的方式方法。一般是一些方法操作。
Event Loop:维护Events和Event Handlers之间的交互流程。接收所有的Event,然后将其分配给合适的Handler处理。
? ? Message(Event)无论是在同步还是异步的EDA中,没有使用任何同步方式进行控制,根本原因是Event被设计成了不可改变对象,因为Event在经过每一个Channel(Handler)的时候,都会创建一个全新的Event,多个线程之间不会出现资源竞争,不需要同步的保护。
同步方案代码:
public interface Message {
Class<? extends Message> getType();
}
public interface Channel<E extends Message>{
void dispatch(E message);
}
public interface DynamicRouter<E extends Message> {
void registerChannel(Class<? extends E> messageType,Channel<? extends E> channel);
void dispatch(E message) throws MessageMatcherException;
}
public class Event implements Message{
@Override
public Class<? extends Message> getType() {
return getClass();
}
}
import java.util.HashMap;
import java.util.Map;
public class EventDispatcher implements DynamicRouter<Message>{
private final Map<Class<? extends Message>,Channel> routerTable;
public EventDispatcher() {
this.routerTable=new HashMap<>();
}
@Override
public void registerChannel(Class<? extends Message> messageType, Channel<? extends Message> channel) {
this.routerTable.put(messageType, channel);
}
@Override
public void dispatch(Message message) throws MessageMatcherException {
if(this.routerTable.containsKey(message.getType())){
this.routerTable.get(message.getType()).dispatch(message);
}else {
throw new MessageMatcherException("Can't match the channel for ["+message.getType()+"] type");
}
}
}
public class MessageMatcherException extends Exception {
public MessageMatcherException(String message) {
super(message);
}
}
public class EventDispatcherExample {
static class InputEvent extends Event{
private final int x;
private final int y;
public InputEvent(int x,int y) {
this.x=x;
this.y=y;
}
public int getX() {
return x;
}
public int getY() {
return y;
}
}
static class ResultEvent extends Event{
private final int result;
public ResultEvent(int result) {
this.result=result;
}
public int getResult() {
return result;
}
}
static class ResultEventHandler implements Channel<ResultEvent>{
@Override
public void dispatch(ResultEvent message) {
System.out.println("The result is:"+message.getResult());
}
}
static class InputEventHandler implements Channel<InputEvent>{
private final EventDispatcher dispacher;
public InputEventHandler(EventDispatcher dispatcher) {
this.dispacher=dispatcher;
}
@Override
public void dispatch(InputEvent message){
System.out.printf("X:%d,Y:%d\n",message.getX(),message.getY());
int result=message.getX()+message.getY();
try {
this.dispacher.dispatch(new ResultEvent(result));
} catch (MessageMatcherException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
EventDispatcher dispatcher=new EventDispatcher();
dispatcher.registerChannel(InputEvent.class, new InputEventHandler(dispatcher));
dispatcher.registerChannel(ResultEvent.class, new ResultEventHandler());
try {
dispatcher.dispatch(new InputEvent(1,2));
} catch (MessageMatcherException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
异步方案代码是在共用了部分同步代码之后形成的:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public abstract class AsyncChannel implements Channel<Event>{
private final ExecutorService executorService;
public AsyncChannel(ExecutorService executorService) {
this.executorService=executorService;
}
public AsyncChannel() {
this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2));
}
public final void dispatch(Event message) {
this.executorService.submit(()->this.handle(message));
}
protected abstract void handle(Event message);
void stop() {
if(null!=this.executorService&&!this.executorService.isShutdown()) {
this.executorService.shutdown();
}
}
}
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class AsyncEventDispatcher implements DynamicRouter<Event>{
private final Map<Class<? extends Event>,AsyncChannel> routerTable;
public AsyncEventDispatcher() {
this.routerTable=new ConcurrentHashMap<>();
}
@Override
public void registerChannel(Class<? extends Event> messageType, Channel<? extends Event> channel) {
if(!(channel instanceof AsyncChannel)) {
throw new IllegalArgumentException("The channel must be AsyncChannel type.");
}
this.routerTable.put(messageType, (AsyncChannel)channel);
}
@Override
public void dispatch(Event message) throws MessageMatcherException {
if(this.routerTable.containsKey(message.getType())) {
this.routerTable.get(message.getType()).dispatch(message);
}else {
throw new MessageMatcherException("Can't match the channel for ["+message.getType()+"] type");
}
}
public void shutdown() {
this.routerTable.values().forEach(AsyncChannel::stop);
}
}
import java.util.concurrent.TimeUnit;
public class AsyncEventDispatcherExample {
static class AsyncInputEventHandler extends AsyncChannel{
private final AsyncEventDispatcher dispatcher;
AsyncInputEventHandler(AsyncEventDispatcher dispatcher){
this.dispatcher=dispatcher;
}
@Override
protected void handle(Event message) {
EventDispatcherExample.InputEvent inputEvent=(EventDispatcherExample.InputEvent) message;
System.out.printf("X:%d,Y:%d\n",inputEvent.getX(),inputEvent.getY());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int result = inputEvent.getX()+inputEvent.getY();
try {
this.dispatcher.dispatch(new EventDispatcherExample.ResultEvent(result));
} catch (MessageMatcherException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
static class AsyncResultEventHandler extends AsyncChannel{
@Override
protected void handle(Event message) {
EventDispatcherExample.ResultEvent resultEvent=(EventDispatcherExample.ResultEvent) message;
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("the result is:"+resultEvent.getResult());
}
}
public static void main(String[] args) {
AsyncEventDispatcher dispatcher=new AsyncEventDispatcher();
dispatcher.registerChannel(EventDispatcherExample.InputEvent.class, new AsyncInputEventHandler(dispatcher));
dispatcher.registerChannel(EventDispatcherExample.ResultEvent.class, new AsyncResultEventHandler());
try {
dispatcher.dispatch(new EventDispatcherExample.InputEvent(1, 2));
} catch (MessageMatcherException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}