RabbitMQ 要实现消息持久化,必须满足以下 4 个条件:
1、投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),第2个参数设置为true持久化;
2、设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENTTEXTPLAIN,x),第3个参数设置为存储纯文本到磁盘;
3、消息已经到达持久化交换器上;
4、消息已经到达持久化的队列。
消息持久化的缺点是很消耗性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量。可使用固态硬盘来提高读写速度,以达到缓解消息持久化的缺点。
RabbitMQ 消费类型也就是交换器(Exchange)类型有以下四种:
direct:轮询方式。
headers:轮询方式,允许使用header而非路由键匹配消息,性能差,几乎不用。
fanout:广播方式,发送给所有订阅者。
topic:匹配模式,允许使用正则表达式匹配消息。
RabbitMQ默认的是direct方式。
RabbitMQ使用ack消息确认的方式保证每个消息都能被消费,开发者可根据自己的实际业务,选择channel.basicAck()方法手动确认消息被消费。
RabbitMQ接收到消息后可以不消费,在消息确认消费之前,可以做以下两件事:
拒绝消息消费,使用channel.basicReject(消息编号, true) 方法,消息会被分配给其他订阅者;
设置为死信队列,死信队列是用于专门存放被拒绝的消息队列。
RabbitMQ的事务在autoAck=true也就是自动消费确认的时候,事务是无效的。
这是因为如果是自动消费确认,RabbitMQ会直接把消息从队列中移除,即使后面事务回滚也不能起到任何作用。
1、解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2、冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据 丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队 列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保 你的数据被安全的保存直到你使用完毕。
3、扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的, 只要另外增加处理过程即可。
4、灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不 常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪 费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负 荷的请求而完全崩溃。
5、可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合 度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复 后被处理。
6、顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的, 并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消 息的有序性)
7、缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度 不一致的情况。
8、异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允 许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放 多少,然后在需要的时候再去处理它们。
由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。
RabbitMQ使用信道的方式来传输数据。
信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。
1)中小型软件公司,建议选RabbitMQ。一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。
2)大型软件公司,根据具体使用在rocketMq和kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。具体该选哪个,看使用场景。
生产者弄丢数据 消息队列弄丢数据 消费者弄丢数据
1)生产者丢数据
从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。
2)消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步。
第一步:将queue的持久化标识durable设置为true,则代表是一个持久的队列。
第二步:发送消息的时候将deliveryMode=2 这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据。
3)消费者丢数据
消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rabbitMQ会立即将消息删除,这种情况下,如果消费者出现异常而未能处理消息,就会丢失该消息。
解决消息队列过期失效问题,可以采取以下几种策略:
?