Active Objects设计模式

发布时间:2023年12月18日

? ?Active是主动的意思,Active Object是主动对象的意思。主动对象就是拥有自己的独立线程。 Active Object模式不仅有自己的独立线程,还可以接受异步消息,并能返回处理结果。从标准的Active Objects设计入手,将一个接口的方法调用转换成可接收异步消息的主动对象,也就是说方法的执行和方法的调用是在不同的线程中进行的,接口方法的参数以及具体的实现封装成特定的Message告诉执行线程,接口方法需要返回值,必须以Future形式返回。

? ?第一种方法:当某个线程调用OrderService接口的findOrderDetails方法时,是会发送一个包含findOrderDetails方法参数以及OrderService具体实现的Message到Message队列,执行线程通过从队列中获取Message来调用具体的实现,接口的方法的调用和接口方法的执行分别处于不同的线程中,因此称该接口为Active Objects(可接受异步消息的主动对象)。 具体样例代码如下:

public interface OrderService {
Future<String> findOrderDetails(long orderId);
void order(String account,long orderId);
}
public class OrderServiceImpl implements OrderService{
@Override
public Future<String> findOrderDetails(long orderId) {
return FutureService.<Long,String>newService().submit(input->{
try {
System.out.println("process the orderId->"+orderId);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "The order details Information";
},orderId);
}

@Override
public void order(String account, long orderId) {
try {
System.out.println("process the orderId->"+orderId+" , account->"+account);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}
public class OrderServiceProxy implements OrderService{
private final OrderService orderService;
private final ActiveMessageQueue activeMessageQueue;

public OrderServiceProxy(OrderService orderService,ActiveMessageQueue activeMessageQueue) {
this.orderService=orderService;
this.activeMessageQueue=activeMessageQueue;
}

@Override
public Future<String> findOrderDetails(long orderId) {
final ActiveFuture<String> activeFuture=new ActiveFuture<>();
Map<String,Object> params=new HashMap<>();
params.put("orderId", orderId);
params.put("activeFuture", activeFuture);
MethodMessage message=new FindOrderDetailsMessage(params,orderService);
activeMessageQueue.offer(message);
return activeFuture;
}

@Override
public void order(String account, long orderId) {
Map<String,Object> params=new HashMap<>();
params.put("account", account);
params.put("orderId", orderId);
MethodMessage message=new OrderMessage(params,orderService);
System.out.println("processing in OrderServicePoxy.order method");
activeMessageQueue.offer(message);
}

}
public class ActiveFuture<T> extends FutureTask<T>{
@Override
public void finish(T result) {
super.finish(result);
}
}
import java.util.Map;

public abstract class MethodMessage {
protected final Map<String,Object> params;
protected final OrderService orderService;

public MethodMessage(Map<String,Object> params,OrderService orderService) {
this.params=params;
this.orderService=orderService;
}

public abstract void execute();
}
public class FindOrderDetailsMessage extends MethodMessage{

public FindOrderDetailsMessage(Map<String, Object> params, OrderService orderService) {
super(params, orderService);
}

@Override
public void execute() {
Future<String> realFuture=orderService.findOrderDetails((Long) params.get("orderId"));
ActiveFuture<String> activeFuture=(ActiveFuture<String>)params.get("activeFuture");
try {
String result=realFuture.get();
activeFuture.finish(result);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}
import java.util.Map;

public class OrderMessage extends MethodMessage{

public OrderMessage(Map<String, Object> params, OrderService orderService) {
super(params, orderService);
}

@Override
public void execute() {
String account=(String)params.get("account");
long orderId=(long)params.get("orderId");
orderService.order(account, orderId);
}

}
import java.util.LinkedList;

public class ActiveMessageQueue {
private final LinkedList<MethodMessage> message=new LinkedList<>();

public ActiveMessageQueue() {
System.out.println("active Object Thread is build");
new ActiveDaemonThread(this).start();
}

public void offer(MethodMessage methodMessage) {
synchronized(this) {
message.add(methodMessage);
System.out.println("processing in ActiveMessageQueue.offer method");
this.notify();
}
}

protected MethodMessage take() {
synchronized(this) {
while(message.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("processing in ActiveMessageQueue.take method");
return message.removeFirst();
}
}
}
public class ActiveDaemonThread extends Thread{
private final ActiveMessageQueue queue;

public ActiveDaemonThread(ActiveMessageQueue queue) {
super("ActiveDaemonThread");
this.queue=queue;
this.setDaemon(true);
}

@Override
public void run() {
for(;;) {
System.out.println(" active daemon thread is running");
MethodMessage methodMessage=this.queue.take();
methodMessage.execute();
}
}

}
public class OrderServiceFactory {
private final static ActiveMessageQueue activeMessageQueue=new ActiveMessageQueue();

private OrderServiceFactory() {}

public static OrderService toActiveObject(OrderService orderService) {
return new OrderServiceProxy(orderService,activeMessageQueue);
}

}
public class AOtest {

public static void main(String[] args) {
OrderService orderService=OrderServiceFactory.toActiveObject(new OrderServiceImpl());
orderService.order("aACC", 5);
Future<String> f=orderService.findOrderDetails(50);
try {
System.out.println("future result is "+f.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Return immedately");
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

? ? 第二种方法:第一种方法在接口方法非常多的情况下,会需要封装成很多的Message类。基于JDK动态代理的方式,可以实现一种更加通用的Active Objects。这种方式下,可以将任意接口方法转换w Active Objects,如果接口方法有返回值,必须返回Future类型才可以,否则会抛出IllegalActiveMethod异常。示例代码如下:

public class IllegalActivedException extends Exception{
public IllegalActivedException(String message) {
super(message);
}
}
public interface OrderService {
Future<String> findOrderDetails(long orderId);
void order(String account,long orderId);
}
public class OrderServiceImpl implements OrderService{

@ActiveMethod
@Override
public Future<String> findOrderDetails(long orderId) {
return FutureService.<Long,String>newService().submit(input->{
try {
System.out.println("process the orderId->"+orderId);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "The order details Information";
},orderId);
}

@ActiveMethod
@Override
public void order(String account, long orderId) {
try {
System.out.println("process the orderId->"+orderId+" , account->"+account);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

@Retention(RUNTIME)
@Target(METHOD)
public @interface ActiveMethod {

}
public class ActiveMessage {
private final Object[] objects;
private final Method method;
private final ActiveFuture<Object> future;
private final Object service;

private ActiveMessage(Builder builder) {
this.objects=builder.objects;
this.method=builder.method;
this.future=builder.future;
this.service=builder.service;
}

public void execute() {
Object result;
try {
result = method.invoke(service, objects);
if(future!=null) {
Future<?> realFuture= (Future<?>)result;
Object realResult=realFuture.get();
future.finish(realResult);
}
} catch (Exception e) {
if(future!=null) {
future.finish(null);
}
e.printStackTrace();
}
}

static class Builder{
private Object[] objects;
private Method method;
private ActiveFuture<Object> future;
private Object service;

public Builder useMethod(Method method) {
this.method=method;
return this;
}

public Builder returnFuture(ActiveFuture<Object> future) {
this.future=future;
return this;
}

public Builder withObjects(Object[] objects) {
this.objects=objects;
return this;
}

public Builder forService(Object service) {
this.service=service;
return this;
}

public ActiveMessage build() {
return new ActiveMessage(this);
}

}

}
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import org.multithread.future.Future;

public class ActiveServiceFactory {
private final static ActiveMessageQueue queue=new ActiveMessageQueue();

public static <T> T active(T instance) {
Object proxy=Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(),
new ActiveInvocationHandler<>(instance));
return (T)proxy;
}

private static class ActiveInvocationHandler<T> implements InvocationHandler{
private final T instance;

ActiveInvocationHandler(T instance){
this.instance=instance;
}

private void checkMethod(Method method) throws IllegalActivedException{
if(!isReturnVoidType(method)&&!isReturnFutureType(method)) {
throw new IllegalActivedException("the method ["+method.getName()+"] return type must be void/Future");
}
}

private boolean isReturnVoidType(Method method) {
return method.getReturnType().equals(Void.TYPE);
}

private boolean isReturnFutureType(Method method) {
return method.getReturnType().isAssignableFrom(Future.class);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(method.isAnnotationPresent(ActiveMethod.class)) {
this.checkMethod(method);
ActiveMessage.Builder builder=new ActiveMessage.Builder();
builder.useMethod(method).withObjects(args).forService(instance);
Object result=null;
if(this.isReturnFutureType(method)) {
result=new ActiveFuture<>();
builder.returnFuture((ActiveFuture) result);
}
queue.offer(builder.build());
return result;
}else {
return method.invoke(instance, args);
}
}

}

}
import java.util.LinkedList;

public class ActiveMessageQueue {
private final LinkedList<ActiveMessage> activeMessages=new LinkedList<>();

public ActiveMessageQueue() {
System.out.println("active Object Thread is build");
new ActiveDaemonThread(this).start();
}

public void offer(ActiveMessage activeMessage) {
synchronized(this) {
this.activeMessages.add(activeMessage);
System.out.println("processing in ActiveMessageQueue.offer method");
this.notify();
}
}

public ActiveMessage takeActive() {
synchronized(this) {
while(this.activeMessages.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return this.activeMessages.removeFirst();
}
}

}
public class ActiveDaemonThread extends Thread{
private final ActiveMessageQueue queue;

public ActiveDaemonThread(ActiveMessageQueue queue) {
super("ActiveDaemonThread");
this.queue=queue;
this.setDaemon(true);
}

@Override
public void run() {
for(;;) {
System.out.println(" active daemon thread is running");
ActiveMessage activeMessage=this.queue.takeActive();
activeMessage.execute();
}
}
}
public class AOtest {

public static void main(String[] args) {
ActiveServiceFactory activeInstance=new ActiveServiceFactory();
OrderService orderService=activeInstance.active(new OrderServiceImpl());
orderService.order("aACC", 5);
Future<String> f=orderService.findOrderDetails(150);
try {
System.out.println("future result is "+f.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Return immedately");
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

文章来源:https://blog.csdn.net/seacean2000/article/details/134966270
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。