ActiveMQ断线重连技巧,即通信高可用的配置

发布时间:2023年12月17日

最近在做一个内部应用的时候,应用到了ActiveMQ作为服务之间消息传递,解耦服务之间的关联,但是在应用的过程中遇到了连接断线无法重连的问题,下面基于这个问题,深入了解一下ActiveMQ的一些相关原理和知识。

一、前置知识

1.1 基础概念

ActiveMQ中有3个重要的角色:Broker、Producer、Consumer。
Broker为消息代理,它是ActiveMQ服务端角色,接收客户端的链接并提供消息通信的核心服务。
Producer是消息生产者,客户端角色
Consumer是消息消费者,客户端角色。
要保证Producer和Consumer正常通信,主要是通过Broker来实现的,Broker既代理了Producer同时也代理了Consumer,这样Broker才能知道哪些是生产者,哪些是消费者,不至于不同的消息被本不属于它的消费者给消费了。Broker内部的机制我们下一节学习。
那么生产者和消费者如何实现通信的呢?ActiveMQ定义的连接器(connector)就是用来约定ActiveMQ的节点之间如何通信的。

1.2 连接器

ActiveMQ通过网络连接器这种连接机制来实现客户端与服务端之间的通信。ActiveMQ提供了两种连接器:

  1. 传输连接器(transport connector):用于客户端和服务端之间( client-to-broker)的通信。
  2. 网络连接器(network connector):用户集群中多个服务端之间(broker-to-broker)的通信。

1.3 传输连接器包括的协议

  • tcp,默认使用的协议,符合大多数的使用场景。
  • udp,客户端使用udp协议和服务端通信,当客户端和服务端之间存在防火墙可以考虑使用udp协议。
  • vm,当客户端和服务端在同一个JVM中可以考虑使用。直接使用虚拟机本地方法调用,从而避免网络通信的开销。
  • nio,本质上还是tcp,只是使用了java NIO包,某些场景下可能性能更好。
  • ssl,基于tcp提供安全的通信。
  • http/https,允许客户端使用REST或Ajax的方式进行连接,可以通过JS给ActiveMQ发送消息。
  • multicast,客户端使用组播的方式连接到服务端。
  • websocket,可以通过HTML5中的websocket技术连接服务端。
  • amqp,高级消息队列协议,很多消息中间件都支持该协议。ActiveMQ5.8版本开始支持。
  • mqtt,MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,主要应用在loT(物联网)。
  • stomp,STOMP是在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义,就像HTTP在TCP套接字之上添加了请求-响应模型层一样。ActiveMQ5.6版本开始支持

二、ActiveMQ断线发生的场景

ActiveMQ的客户端与ActiveMQ的Broker(消息代理)之间的网络连接发生断开,如果未采用高可用的配置,那么Producer无法向MQ中生产对象,同理Customer也无法消费MQ中的消息,整个业务就会出现暂停。
默认的情况下如果ActiveMQ服务正常,那么所有Client服务启动,都会自动在Broker中进行注册,这样就能实现消息生产和消费。但是如果Client服务正常,ActiveMQ服务宕机了进行重启或当网络不稳定或出现故障导致连接断开时,ActiveMQ是不会主动实现Client与Broker进行重连的,此时所有服务都正常,由于连接未建立,所以整个业务也无法实现消息的生产和消费。

三、ActiveMQ断线重连的实现原理

ActiveMQ的断线重连机制的实现原理主要是基于网络通信和消息重试机制。当ActiveMQ的客户端与Broker之间的连接断开时,客户端会检测到这个事件,并尝试重新建立连接。,ActiveMQ的客户端会检测到连接中断事件,然后触发一个重连机制。客户端会尝试重新连接到一个或多个Broker的URL。在默认情况下,如果连接断开,客户端会新起一个线程,不断的从url参数中获取一个url来重试连接。这种重试机制通常会进行一定的次数限制,以避免无限制的重试导致资源浪费或其他问题。

另外,在重连过程中,ActiveMQ的客户端还会尝试恢复之前未发送成功的消息。这个过程主要是通过持久化消息存储来实现的。在连接断开时,客户端会将未发送成功的消息存储到持久化存储中,如数据库或文件系统等。当客户端成功连接到Broker后,会从持久化存储中恢复这些消息,并进行重新发送。

四、ActiveMQ断线重连的配置

ActiveMQ提供了客户端和服务器端通信高可用的配置,

failover,为客户端提供重连服务端的逻辑,允许配置多个上面介绍的不同协议的连接配置,并随机的从其中选择一个进行连接,如果失败则继续选择其他服务重试。failover的配置格式:failover:(tcp://ip1:61616,tcp://ip2:61616)?initialReconnectDelay=100。

fanout,采用复制的方式将消息发送给多个服务端,配置格式为:fanout:(tcp://localhost:61629,udp://localhost:61639,tcp://localhost:61649)

4.1 failover

failover是一种ActiveMQ提供的失效转移(也叫故障转移)的策略。其原理是如果服务先连接到tcp://ip1:61616这个消息队列,如果因为网络抖动或其他意外情况导致ip1无法连接,failover会自动切换到ip2:61616这个消息队列,实现了消息高可用,如果ip1的网路正常了,failover又会尝试连接回来。但是这与断线重连有什么关系呢?
经过我的验证,如果只配置了一个ActiveMQ,如:failover:(tcp://ip1:61616)?initialReconnectDelay=100 当ip1上的ActiveMQ出现了问题,此时failover无法进行故障转移,他就会在initialReconnectDelay定义的100毫秒后进行ip1的重连,从而导致Client与ip1的Broker重新建立链接,实现了断线重连的功能。

4.2 fanout

采用复制的方式将消息发送给多个服务端,这里面虽然没有断线重连,但是实现了消息发送的高可用,这里面需要注意一点,如果Customer在没有很好的处理消息的情况下,有可能Productor生产了一个消息,发送给多个消息队列,Customer消费了多次消息,导致数据重复,所以需要注意Customer消息消费逻辑的幂等性。

4.3 自定义函数实现断线重连

JMS提供了ExceptionListener接口用于侦听JMS消息链接异常,以下是基于JMS的ExceptionListener接口实现的断线自动重连的示例:

import java.util.Timer;
import java.util.TimerTask;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;

/**
 * JMS 重连接实现<br>
 * 通过实现{@link ExceptionListener}接口侦听连接异常,
 * 使用定时任务迟延执行重连接尝试直至连接成功
 * @author guyadong
 * @since 2.3.8
 */
class AutoReconnectAdapter implements ExceptionListener,JmsConstants{
	private static long START_RECONNECTDELAY = 1;
	/**
	 * 用于执行自动重连的定时器对象
	 */
	private static final Timer reconnectTimer = new Timer("AMQP Reconnect"); 
	/**
	 * 定时重连的延迟时间(秒),从1秒开始,每次增加一倍,最大128
	 */
	private long reconnectDelay = START_RECONNECTDELAY; 
	/**
	 * 最大重连延迟时间
	 */
	private long maxReconnectDelay = 128;
	/**
	 * 应用层实现的重连回调接口
	 */
	private final JMSReconnectCallback jmsReconnectCallback;
	public AutoReconnectAdapter(JMSReconnectCallback jmsReconnectCallback) {
		this.jmsReconnectCallback = jmsReconnectCallback;
	}

	@Override
	public void onException(JMSException exception) {
		if(null != jmsReconnectCallback) {
			try {
				jmsReconnectCallback.onConnectionLost();
				scheduleReconnectCycle();
			} catch (Exception e) {
				logger.error(e.getMessage(),e);
			}
		}
	}
	/**
	 * 尝试将客户端重新连接到服务器。如果成功,它将确保不再计划重新连接。
	 * 但是,如果连接失败,延迟将增加一倍(最大128秒),并将在延迟后重新安排重新连接。
	 */
	private void attemptReconnect() {
		if(null != jmsReconnectCallback) {
			try {
				jmsReconnectCallback.tryReconnecting();
				// restore to default value
				reconnectDelay = START_RECONNECTDELAY;
			}	catch (Exception e) {
				if(e instanceof JMSException || e.getCause() instanceof JMSException ) {
					reconnectDelay = Math.min(reconnectDelay*2, maxReconnectDelay);
					scheduleReconnectCycle();
				}else {
					logger.error(e.getMessage(),e);
				}
			}
		}
	}
	/**
	 * 安排在{@link #reconnectDelay}指定的延迟时间后执行重连接尝试
	 */
	private void scheduleReconnectCycle() {
		logger.info("{} Scheduling reconnect timer, delay {} seconds",jmsReconnectCallback.ownerName(), reconnectDelay);
		reconnectTimer.schedule(new TimerTask() {
			@Override
			public void run() {
				attemptReconnect();
			}}, reconnectDelay*1000);
	}
}

为了适应应用层不同的重连接实现需要,通过定义JMSReconnectCallback接口,来让断连接和重连实现抽象化,应用层可以根据自己的需要,实现此接口,执行断开连接和重连的动作

/**
 * JMS 重连机制回调接口
 * @author guyadong
 * @since 2.3.8
 */
public interface JMSReconnectCallback{
	/**
	 * 连接异常侦听
	 * @throws Exception 
	 */
	public void onConnectionLost() throws Exception;
	/**
	 * 尝试重连动作
	 * @throws Exception 失败抛出异常 JMSException 或异常原因(cause)为JMSException 则继续异步执行重试
	 */
	public void tryReconnecting() throws Exception;
	/**
	 * 返回当前接口对象所属的模块名,用于日志输出
	 */
	public String ownerName();
}

完整代码参见码云仓库:https://gitee.com/l0km/simplemq/blob/dev/simplemq-jms/src/main/java/gu/simplemq/jms/AutoReconnectAdapter.java

五、ActiveMQ断线重连的优化策略

断线重连如果过于频繁,也会导致网络与服务器的压力,建议从以下几点进行断线重连的优化:

减少断线重连的频率, 适当提升断线重连中间的延时
优化网络质量, 比如Client与MQ都部署在内网
调整连接超时和重试次数
使用多线程处理连接失败
使用负载均衡和集群提高可用性
优化消息恢复和持久化性能, 确保未被正常处理的消息有恢复机制
选择合适的消息恢复策略
断线重连的日志监控和告警机制
日志监控工具的选择和使用
告警机制的建立和维护,检测到断线之后最好给运维人员发送一条消息

六、总结

消息队列在大型项目的建设过程中被广泛的应用,虽然能够实现削峰异步处理,但是消息队列的引入会增加服务运维的风险和成本。今天总结了Client与ActiveMQ出现了断开链接如何实现自动重连的机制和原理,以及相关高可用配置方法。文章中如有错误,请多多指正,您的指正是我们共同进步的基石。

文章来源:https://blog.csdn.net/Scalzdp/article/details/134870412
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。