为了规范和限制业务方使用,我们使用接口加注解的方式来提供业务方来发布任务。业务方在调用该接口的时候,我们会将其动态代理到调用内部客户端真正的方法。
我们内部客户端有个Publisher接口,并有PublisherImpl实现了该接口,用来进行任务发布。但是我们不想业务在调用的时候去填写任务模版Template,而是希望其在接口上声明,我们通过注解扫描将其扫描进来并进行注册,然后帮助业务进行调用时动态加入模板信息。
package com.xxx.arch.mw.nbp.client.publish;
import com.xxx.arch.mw.nbp.common.domain.*;
import com.xxx.arch.mw.nbp.common.exception.BlockException;
import com.xxx.arch.mw.nbp.common.exception.NbpException;
import com.xxx.arch.mw.nbp.common.exception.RpcException;
import com.xxx.arch.mw.nbp.common.extension.Filter;
import com.xxx.arch.mw.nbp.share.dto.*;
import com.xxx.commons.data.domain.CursorPageQuery;
import com.xxx.commons.data.domain.DefaultCursorPage;
import com.xxx.commons.data.domain.DefaultPage;
import com.xxx.commons.data.domain.Result;
import java.util.List;
import java.util.Map;
public interface Publisher extends Bootable, Destroyable {
/**
* 注册某个任务发布者
*
* @param templateCode 任务模板
*/
void register(final String templateCode) throws NbpException;
/**
* 反注册某个任务发布者
*
* @param templateCode 任务模板
*/
void unregister(final String templateCode) throws NbpException;
/**
* 添加一个Filter
*
* @param filter
*/
void addFilter(final Filter filter);
/**
* 移除一个Filter
*
* @param filter
*/
void removeFilter(final Filter filter);
/**
* 发布一条调度任务
*
* @param singleDetail 发布数据包
* @param template 发布模板
* @return
* @throws BlockException 流控降级异常
* @throws NbpException 业务异常
* @throws RpcException 超时异常
*/
Result<SingleDetail> publish(SingleDetail singleDetail, Template template) throws NbpException;
/**
* 批量发布一批调度任务
*
* @param multiDetail 发布数据包
* @param template 发布模板
* @return
* @throws BlockException 流控降级异常
* @throws NbpException 业务异常
* @throws RpcException 超时异常
*/
Result<MultiResult<SingleDetail>> publish(MultiDetail multiDetail, Template template) throws NbpException;
}
为此,我们定义了一个PublishService,希望业务方通过继承该接口,然后业务方只需要声明一个接口继承PublishService接口即可。
package com.cainiao.arch.mw.nbp.client.publish;
import com.cainiao.arch.mw.nbp.common.domain.MultiDetail;
import com.cainiao.arch.mw.nbp.common.domain.SingleDetail;
import com.cainiao.arch.mw.nbp.common.exception.BlockException;
import com.cainiao.arch.mw.nbp.common.exception.NbpException;
import com.cainiao.arch.mw.nbp.common.exception.RpcException;
import com.cainiao.arch.mw.nbp.share.dto.MultiResult;
import com.cainiao.commons.data.domain.Result;
/**
* @author cainiao-inc
*/
public interface PublishService {
/**
* 发布一条调度任务
*
* @param singleDetail 发布数据包
* @return
* @throws BlockException 流控降级异常
* @throws NbpException 业务异常
* @throws RpcException 超时异常
*/
Result<SingleDetail> publish(SingleDetail singleDetail) throws NbpException;
/**
* 批量发布一批调度任务
*
* @param multiDetail 发布数据包
* @return
* @throws BlockException 流控降级异常
* @throws NbpException 业务异常
* @throws RpcException 超时异常
*/
Result<MultiResult<SingleDetail>> publish(MultiDetail multiDetail) throws NbpException;
}
业务在调用PublishService的方法Result<SingleDetail> publish(SingleDetail singleDetail) throws NbpException;时代理到调用内部PublisherImpl的实际方法Result<SingleDetail> publish(SingleDetail singleDetail, Template template) throws NbpException;其中Template是我们在注解时将其扫描进来,在调用时动态带上。
假设业务方声明了DemoPublisher接口,实际上在调用的时候,我们会将其代理到客户端内部的Publisher接口对应实现类
package com.xxx.arch.sample.spring;
import com.xxx.arch.mw.nbp.client.publish.PublishService;
import com.xxx.arch.mw.nbp.client.publish.annotation.DispatchPublisher;
import com.xxx.arch.mw.nbp.common.exception.NbpException;
import org.springframework.context.annotation.Description;
/**
* @author cainiao-inc
* @created 2022-08-15 10:16 AM
* @description:
*/
@Description("NBP已升级到新内核")
@DispatchPublisher(code = "${demo.nbp.template.code}", desc = "NBP示例")
public interface DemoPublisher extends PublishService {
}
业务方使用result = demoPublisher.publish(singleDetail); 来发布一个任务。
package com.xxx.arch;
import com.alibaba.fastjson.JSON;
import com.xxx.arch.mw.nbp.common.domain.SingleDetail;
import com.xxx.arch.mw.nbp.common.domain.Trigger;
import com.xxx.arch.mw.nbp.common.domain.Triggerable;
import com.xxx.arch.mw.nbp.common.exception.DegradeException;
import com.xxx.arch.mw.nbp.common.exception.FlowException;
import com.xxx.arch.mw.nbp.common.exception.NbpException;
import com.xxx.arch.mw.nbp.common.exception.RpcException;
import com.xxx.arch.sample.spring.DemoPublisher;
import com.xxx.arch.sample.spring.DemoObject;
import com.xxx.commons.data.domain.Result;
import com.xxx.commons.retry.backoff.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* @author chengxu
*/
@Controller
@RequestMapping("/v1/demo")
@DependsOn("demoPublisher")
public class DemoController {
@Autowired
private DemoPublisher demoPublisher;
@GetMapping("/publish")
public @ResponseBody
Result<SingleDetail> publish() {
Result<SingleDetail> result = null;
try {
// 构建业务对象
DemoObject demoObject = buildDemoObject();
// 构建默认单任务详情
// SingleDetail singleDetail = new SingleDetail();
// 构建带指定bizKey的但任务详情(仅后端是Defylab才支持,且需要先配置)
// 构建不带Trigger的SingleDetail,对于ISS则是立即下发回调,对于Defylab按匹配的模板规则下发,未匹配到规则则立即下发回调
SingleDetail singleDetail = new SingleDetail(demoObject.getName());
// 添加用户自定义业务参数
singleDetail.getUserContext().put("demoObject", JSON.toJSONString(demoObject));
// 发布任务
result = demoPublisher.publish(singleDetail);
} catch (NbpException nbpException) {
// 处理业务异常,调用NBP发布任务时被NBP判定为不合法的相关业务异常,需要业务方关注处理
} catch (RpcException rpcException) {
// 处理超时异常,调用NBP发布任务超时,业务方需自行尝试重试
} catch (FlowException | DegradeException blockException) {
// 处理流控异常,调用NBP发布任务被NBP限流规则限流,业务方需自行尝试重试
}
return result;
}
public DemoObject buildDemoObject() {
DemoObject demoObject = new DemoObject();
long id = ThreadLocalRandom.current().nextLong(1023);
demoObject.setId(String.valueOf(id));
demoObject.setName(String.join("_", "demo", String.valueOf(id)));
demoObject.setSeq(1);
return demoObject;
}
}
上述示例可以让业务不感知我们的任务模版,限制其一个接口只能发布对应任务模板的任务,避免发送到其它任务模板。那么我们来看下内部是如何实现的。
显然接口是无法直接使用的,最终得有个中间代理类。然后在初始化时我们需要通过注解扫描的方式将对应注解扫描到,并注入到Spring容器中,这样在业务方使用时就可以通过@Autowired注解将其声明的注解注入到对应的位置。而在接口调用时,通过方法代理到实际的Publisher的对应方法上面去。
package com.xxx.arch.mw.nbp.client.spring;
import org.springframework.cglib.proxy.InvocationHandler;
import java.lang.reflect.Method;
/**
* @author cainiao-inc
*/
public interface PublisherProxy extends InvocationHandler {
@Override
Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
}
package com.xxx.arch.mw.nbp.client.spring;
import com.xxx.arch.mw.nbp.client.publish.Publisher;
import com.xxx.arch.mw.nbp.client.publish.annotation.DispatchPublisher;
import com.xxx.arch.mw.nbp.client.publish.annotation.DispatchTemplate;
import com.xxx.arch.mw.nbp.common.domain.MultiDetail;
import com.xxx.arch.mw.nbp.common.domain.NbpCode;
import com.xxx.arch.mw.nbp.common.domain.SingleDetail;
import com.xxx.arch.mw.nbp.common.domain.Template;
import com.xxx.arch.mw.nbp.common.exception.NbpException;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;
public class PublisherProxyImpl implements PublisherProxy {
private final Publisher publisher;
private final Class<?> clazz;
private final Template clazzTemplate;
public PublisherProxyImpl(Publisher publisher) {
this(publisher, null);
}
public PublisherProxyImpl(Publisher publisher, Class<?> clazz) {
this.publisher = publisher;
this.clazz = clazz;
if (this.clazz != null) {
DispatchPublisher clazzTemplateAnnotation = clazz.getAnnotation(DispatchPublisher.class);
if (clazzTemplateAnnotation != null && !clazzTemplateAnnotation.code().isEmpty()) {
clazzTemplate = new Template(clazzTemplateAnnotation.code(), clazzTemplateAnnotation.desc());
} else {
clazzTemplate = null;
}
} else {
clazzTemplate = null;
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (RESERVED_METHOD.contains(method)) {
return method.invoke(this, args);
}
DispatchTemplate methodTemplateAnnotation = method.getAnnotation(DispatchTemplate.class);
Template template = methodTemplateAnnotation == null ? clazzTemplate :
new Template(methodTemplateAnnotation.code(), methodTemplateAnnotation.desc());
if (template == null) {
throw new NbpException(NbpCode.ILLEGAL_PARAM.getCode(),
method.getName() + "'s DispatchTemplate annotation and "
+ "DispatchPublisher annotation can't be all empty or invalid!");
}
Class<?>[] methodParameterTypes = method.getParameterTypes();
if (methodParameterTypes.length > 0) {
Class<?> methodParameterType = methodParameterTypes[0];
if (methodParameterType == SingleDetail.class) {
return this.publisher.publish((SingleDetail) args[0], template);
} else if (methodParameterType == MultiDetail.class) {
return this.publisher.publish((MultiDetail) args[0], template);
} else {
throw new NbpException(NbpCode.UNSUPPORTED.getCode(),
"Unsupported parameter type, parameter type must be SingleDetail or MultiWrapper");
}
}
throw new NbpException(NbpCode.UNSUPPORTED.getCode(), "Unsupported parameter type");
}
private final static Set<Method> RESERVED_METHOD = new HashSet<>();
static {
for (Method method : Object.class.getMethods()) {
RESERVED_METHOD.add(method);
}
}
}
为了规范化,我们也使用BeanDefinition方式来初始化
package com.xxx.arch.mw.nbp.client.spring;
import com.xxx.arch.mw.nbp.client.constant.ClientConstants;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.context.ApplicationContext;
import java.util.ArrayList;
import java.util.List;
/**
* @created 2022-11-30 3:08 PM
* @description:
*/
public class PublisherProxyBeanDefinitionBuilder {
private Class<?> clazz;
private ApplicationContext context;
private List<String> dependsOns = new ArrayList<>();
PublisherProxyBeanDefinitionBuilder() {
}
PublisherProxyBeanDefinitionBuilder clazz(Class<?> clazz) {
this.clazz = clazz;
return this;
}
PublisherProxyBeanDefinitionBuilder context(ApplicationContext context) {
this.context = context;
return this;
}
PublisherProxyBeanDefinitionBuilder dependsOns(List<String> dependsOns) {
if (dependsOns != null) {
this.dependsOns = dependsOns;
}
return this;
}
PublisherProxyBeanDefinitionBuilder dependsOn(String dependsOn) {
if (dependsOn == null) {
return this;
}
if (this.dependsOns != null) {
this.dependsOns = new ArrayList<>();
}
this.dependsOns.add(dependsOn);
return this;
}
BeanDefinition build() {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(PublisherProxyFactoryBean.class);
builder.addPropertyValue("clazz", this.clazz);
builder.addPropertyReference("publisher", ClientConstants.NBP_PUBLISHER_IMPL_BEAN_NAME);
builder.addDependsOn(ClientConstants.NBP_PUBLISHER_IMPL_BEAN_NAME);
for (String dependsOn : dependsOns) {
builder.addDependsOn(dependsOn);
}
return builder.getBeanDefinition();
}
}
package com.xxx.arch.mw.nbp.client.spring;
import com.xxx.arch.mw.nbp.client.publish.Publisher;
import com.xxx.arch.mw.nbp.common.domain.NbpCode;
import com.xxx.arch.mw.nbp.common.exception.NbpException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
/**
* @created 2022-11-30 3:32 PM
* @description:
*/
public class PublisherProxyFactoryBean implements FactoryBean<Object>, EnvironmentAware, InitializingBean {
private ConfigurableEnvironment environment;
private Publisher publisher;
private Class<?> clazz;
public PublisherProxyFactoryBean() {
}
@Override
public Object getObject() throws Exception {
Constructor<? extends PublisherProxyImpl> constructor = PublisherProxyImpl.class
.getDeclaredConstructor(Publisher.class, Class.class);
final PublisherProxyImpl instance = constructor.newInstance(publisher, clazz);
Object object = Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, (proxy, method, args) -> {
try {
return instance.invoke(proxy, method, args);
} catch (NbpException e) {
throw e;
} catch (Throwable throwable) {
if (throwable instanceof InvocationTargetException) {
throw ((InvocationTargetException) throwable).getTargetException();
} else {
throw new NbpException(NbpCode.CLIENT_ERROR.getCode(), throwable.getMessage(), throwable);
}
}
});
return object;
}
@Override
public void afterPropertiesSet() throws Exception {
}
@Override
public Class<?> getObjectType() {
return clazz;
}
@Override
public void setEnvironment(Environment environment) {
this.environment = (ConfigurableEnvironment) environment;
}
@Override
public boolean isSingleton() {
return true;
}
public Publisher getPublisher() {
return publisher;
}
public void setPublisher(Publisher publisher) {
this.publisher = publisher;
}
public Class<?> getClazz() {
return clazz;
}
public void setClazz(Class<?> clazz) {
this.clazz = clazz;
}
}
期间省略了注解扫描的实现(该实现后续文章补上)
private void resolveRegistryPublisherProxyBeanDefinition(BeanDefinitionRegistry registry, Set<Class> classSet) {
for (Class clazz : classSet) {
String beanName = this.scanHelper.getComponentName(clazz);
String[] dependOns = (clazz.getAnnotation(DependsOn.class) == null) ? new String[0] :
((DependsOn) clazz.getAnnotation(DependsOn.class)).value();
PublisherProxyBeanDefinitionBuilder beanDefinitionBuilder = new PublisherProxyBeanDefinitionBuilder();
beanDefinitionBuilder.context(context);
beanDefinitionBuilder.clazz(clazz);
beanDefinitionBuilder.dependsOns(Arrays.asList(dependOns));
BeanDefinition beanDefinition = beanDefinitionBuilder.build();
if (context.containsBean(beanName)) {
throw new IllegalArgumentException("[NBP-CLIENT-STARTER] Spring context already has a bean named " + beanName
+ ", please change @DispatchPublisher field name.");
}
// 移除,避免被其它框架注解扫描
try {
LOGGER.debug("NBP-CLIENT-STARTER", "remove interface beanDefinition of {} in spring context.", beanName);
registry.removeBeanDefinition(beanName);
} catch (Exception e) {
LOGGER.debug("NBP-CLIENT-STARTER", "remove interface beanDefinition of {} in spring context error with reason:",
beanName, e.getMessage());
}
// 重新注册
registry.registerBeanDefinition(beanName, beanDefinition);
LOGGER.info("NBP-CLIENT-STARTER", "registered beanDefinition of {} in spring context.", beanName);
}
}