前面的文章:
1、1 一个测试驱动的Spring Boot应用程序开发
2、2 使用React构造前端应用
3、3 试驱动的Spring Boot应用程序开发数据层示例
4、4 向微服务架构转变
后面的文章:
6、6 使用网关模式进行负载均衡
经过前面的示例,了解了微服务之间的接口的紧耦合关系,微服务Multiplication调用微服务Gamification,变成了流程的协调器。如果还有其他服务也需要查询每次尝试的数据,需要额外增加Multiplication对这些服务的调用,从而创建一个具有中央处理的分布式整体。这很显然会带来更多的问题。
现在,以Publish-Subscribe模式,重新设计这些接口,这种方式称为事件驱动的架构(Event-driven Architecture)。发布者(Publisher)不将数据传递到特定的目的地,而是不知道订阅者(Subscriber,即系统中接收数据的一方)的情况下,对事件进行分类和发送。这些事件使用者同样不需要知道发布者的逻辑。这种范式的变化使系统的耦合变得松散且可扩展,同时带来新的挑战。
事件驱动架构的要素是消息代理,系统组件与消息代理通信,而不是彼此直接连接,通过这种方式来保持彼此间的松散耦合。
消息代理通常包含理由功能,可以创建多个“通道(Channel)”,能根据需求划分消息。一个或多个发布者可以在每个通道中生成消息,这些消息可以被一个或多个订阅者(甚至没有订阅者)使用。下面是消息代理使用场景的概念视图。
这些概念不是新概念,经验丰富的开发人员肯定会在企业服务总线(ESB)架构中发现类似的模式。总线模式促进了系统不同部分之间的通信,提供了数据转换、映射、消息队列、排序、理由等功能。
企业服务总线(Enterprise Service Bus,ESB)的概念是从面向服务架构(Service Oriented Architecture, SOA)发展而来。ESB——企业服务总线,像一根管道,用来连接各个节点。为了集成不同系统,不同协议的服务,ESB做了消息的转换、解释与路由等工作,让不同的服务互联互通。
ESB是一种在松散耦合的服务和应用之间标准的集成方式。它可以作用于:
ESB就是在SOA架构中实现服务间智能化集成与管理的中介。
ESB架构和基于消息代理的架构之间的区别,还有一些争议。ESB中,通道本身在系统中具有更重要的意义。服务总线为通信设置协议标准、转换数据并将数据理由到特定目标。有些实现可以处理分布式事务。某些情况下,甚至有一个复杂的UI对业务流程进行建模,并将这些规则转换为配置和代码。ESB架构倾向于将绝大部分系统业务逻辑集中在总线内,构成系统的业务流程层。如图所示:
将所有业务逻辑放到同一组件中且系统中有中央协调器的软件架构,往往是容易失败的软件架构模式。采用这种路线的系统只有一个失败点,因为整个系统都依赖核心部分(总线),随着时间的推移,会变得很难维护和扩展。嵌入总线的逻辑往往会变得一团糟。这也是ESB架构遭人诟病的原因之一。
基于这样的原因,逐渐放弃了这种集中协调的、过于智能的消息传送通道,更倾向于使用消息代理实现一种更简单的方式:只用它与其他不同组件进行通信。
可以将ESB视为复杂通道,而将消息代理视为简单通道,想要划清界限并不容易。一方面,可以使用ESB平台,但要适当隔离业务逻辑;另一方面,某些新的消息传递平台(如Kafka)提供了一些工具,可以在通道中嵌入一些逻辑。如果必要,也可以使用包含业务逻辑的函数来转换消息。也可以像使用数据库一样在通道中查询数据,并根据需要处理输出。因此,可在不同架构(ESB/消息代理)的相关工具之间进行切换,用类似的方式使用。也就是要了解模式,然后选择符合需求的工具。
建议:尽量避免在通信通道中包含业务逻辑,把业务流程保留在每个微服务中。
在事件驱动架构中,一个事件表明在系统中发生了某些事情。业务逻辑拥有发生这些事件的领域,可以将事件发布到消息通道(如消息代理)。构架中的其他组件如果到给定事件类型感兴趣,则订阅该通道,以消费所有后续事件实例。事件与发布-订阅模式有关,也与消息代理或总线关联,可以使用消息代理实现事件驱动架构,下面就来看看。
消息是一个更通用的术语。有人将消息视为直接指向系统组件的元素,将事件视为反映了给定领域中发生的事实的信息片段,而且没有专门指向某个系统组件,以此将两者区分开来。从技术角度看,通过消息代理发送事件时,事件实际上是一条消息。因此,在设计事件驱动架构时,就用“事件”来指代消息,而“消息”则指进入消息代理的通用消息。
这样,可对事件进行建模,使用REST API发送事件(类似于前面示例中的操作)。但是,生产者需要发现消费者,才能将事件发送给它们,这对降低耦合毫无帮助。
将事件与消息代理一起使用,可以隔离软件架构中的所有组件。发布者和订阅者不需要知道彼此存在,这非常适合微服务架构。使用这种策略,可引入新的微服务,它们来消费通道中的事件,不需要修改发布这些事件的微服务,也不需要修改其他订阅者。
引入消息代理和一些Event类,并不能直接变成“事件驱动架构”。必须在设计软件时考虑到事件,需要付出努力。
假设创建了一个Gamification API给用户分配分数和徽章,然后,微服务Multiplication调用接口updateScore,不仅发现了这个微服务,还成为这部分业务逻辑(通过为成功的尝试分配分数)的所有者。这是刚使用微服务架构时常犯的错误,源自命令式编程风格,倾向于在微服务之间用API调用替换方法调用,实现远程过程调用(RPC)模式,有时甚至不会注意到这一点。如图所示:
为降低微服务之间的耦合,可以引入消息代理,将REST API调用替换成一条指向微服务Gamification的消息,即UpdateScore消息。但能否做到不改变系统呢?不能。因为消息仍然有一个目的地,它不能被任何新的微服务调用。此外,系统的两个部分仍然是紧耦合的,并且产生了一个副作用,用异步接口替换了同步接口,增加了额外的复杂性。
基于当前的实现,如图所示:
将一个ChallengeSolvedDTO对象从Multiplication传递到Gamification,维护了域边界。不再在Multiplication服务中包含Gamification的逻辑,但是,仍然需要明确配置Gamification的地址连接,紧耦合依然存在。
通过引入消息代理,可以解决这个问题。微服务Multiplication可将ChallengeSolvedDTO发布到通用的通道,然后继续执行接下来的业务逻辑。第二个微服务可以订阅这个通道并处理消息(从概念上讲,这是一个事件),以计算相应的分数和徽章。如果新加入系统中的微服务也对ChallengeSolvedDTO消息感兴趣,例如生成报告或向用户发送通知,也可以透明地订阅该通道。
第一个场景实现了一种命令模式,其中微服务Multiplication对微服务Gamification执行的操作进行了指引(也称为编制)。第二个场景通过发送关于已发生的事件的通知以及上下文数据,实现了事件模式,消费者将处理此数据,这可能触发业务逻辑,也可能触发其他事件,这种方法称为编排,与编制相反。将软件架构建立在事件驱动设计的基础上时,称其为事件驱动架构。
要实现真正事件驱动架构,必须重新思考哪些可能以命令式表达的业务流程,重新定义动作和事件,不仅仅使用DDD来定义域,还应该将它们之间的交互作为事件。
不需要为了遵循事件驱动架构而更改系统中的每个通信接口。在某些事件驱动模式不适用的情况下,可能相应命令和请求/响应模式。不要试图将只适合命令模式的业务需求包装成事件。
引入消息代理作为构建事件驱动架构的工具,也就接纳了异步消息传递。发布者发送事件,不需要等待任何事件消费者的回复,将使架构保持松耦合并具有可扩展性。如图所示:
当然,也可以使用消息代理来保持进程的同步。前面的例子中,计划用消息代理替换REST API接口,创建两个通道来传递事件,而不是只创建一个通道,使用第二个通道接收来自微服务Gamification的响应。可以阻塞请求的线程,并在继续处理之前等待确认。如图所示:
这实际上是基于消息代理的请求/响应模式,这种组合在某些情况下可能很有用,但不建议在事件驱动方法中使用。主要原因是:微服务Multiplication需要知道订阅者及订阅者的数量,以确保收到所有响应,系统组件会因此而紧耦合。但可从中得到好处,例如可伸缩性;也可以运用其他模式来提高可伸缩性,如负载均衡器。在进程要求必须同步的情况下,可以考虑使用一种更简单的同步接口,例如REST API。下表给出了一种建议,请注意,具体如何使用要具体分析。
模式 | 类型 | 实现方式 |
---|---|---|
请求/响应 | 同步 | REST API |
需要阻塞的命令 | 同步 | REST API |
不需要阻塞的命令 | 异步 | 消息代理 |
事件 | 异步 | 消息代理 |
需要注意的是,尽管端到端采用了异步通信,但应用程序与消息代理之间用的是同步接口。当发布消息时,确保代理在继续其他操作之前已收到消息。这同样适用于订阅者,在订阅者消费了消息后,代理需要一个确保已将该消息标记为已处理,然后移到下一个消息。这两个步骤对保证数据的安全和系统的可靠性至关重要。
响应式系统是将其描述为一套应用于软件架构的设计原则,从而使系统响应敏捷(及时响应)、具有弹性(在出现故障时保持响应)、可灵活伸缩(适应在不同工作负载下响应)和基于消息驱动(确保松散耦合和边界隔离)。如果遵循遵循范式构建系统,可以称为响应式系统。
另外,响应式编程指的是一组在编程语言中使用的技术,围绕如下范式,如futures(或promise)、reactive streams、backpressure等。Java中,有一些流行的库,如Reactor或RxJava,可以实现这些范式。使用响应式编程,可将逻辑切分成一组较小的块,这些逻辑块可以异步运行,然后合成或转换结果。这也带来了并发性的改进,在并行执行任务时,可以更快完成任务。
使用响应式编程并不会让架构编程响应式架构。它们在不同的层面工作,响应式编程有利于在组件内和并发性方面做出改进,响应式系统是这些组件之间在更高层面上的变化,有助于构建松耦合、富有弹性和可伸缩性的系统。
前面的微服务架构,获得了灵活性和可伸缩性,但也面临挑战,如最终一致性、容错性和部分更新等。使用消息代理模式进行事件驱动有助于应对这些挑战。
这里使用异步的过程来简单地通知其他系统组件,避免了创建阻塞的、命令式的流程,这需要一种不同的思维方式,要接受一种观念,即数据的状态可能不会在所有微服务中保持一致。
此外,随着消息代理的引入,需要添加一个新组件,因为不能断定消息代理一定不会出错,就必须让系统准备好以应对潜在的错误。
事件驱动系统的另一个缺点是追溯变得更加困难。调用一个REST API,可能触发事件;然后,可能会有一些组件对这些事件做出响应,随后发布其他事件,继续延长这个链条。当只有几个分布式进程时,了解不同事件在不同的微服务中引起的动作可能问题不大,但随着系统的发展,要全面了解这些事件和操作链是一项巨大的挑战。需要这个视图对错误操作进行调试,并找出触发指定进程的原因。有一些工具可以实现分布式跟踪,将事件和动作链接起来,并可视化为一系列动作/响应,如Spring Cloud Sleuth,可在日志中自动注入标识符的工具,在发出/接收HTTP调用时,通过RabbitMQ发布/消费消息以及其他时候,将这些标识符一直传播下去,然后,如果使用集中式日志记录,可使用这些标识符链接所有的进程。
要根据需要运用消息传递平台的几种模式,如图所示:
这种模式下,不同的订阅者将接收相同消息的副本。例如,系统中可能有多个对ChallengeSolvedEvent感兴趣的组件,如微服务Gamification和微服务Reporting。这种情况下,重要的是配置订阅者,使其接收相同的消息。每个订阅者将以不同的目的处理事件,以免引起重复操作。
这种模式更适合事件,不适用于发送到特定服务的消息。
这种模式也称为竞争消费者模式,这种情况下,可以将工作拆分到同一个应用程序的多个实例之间。
如上图所示,同一个微服务有多个副本,目的是平衡它们之间的工作负载,每个实例将消费不同的消息、处理它们,将结果存储在数据库中。
注意:同一组件的多个副本应共享同一数据层,便于安全地分割工作。
一种常见的情况是,一些订阅者对发布到一个通道的所有消息都感兴趣,而其他一些订阅者则只对其中一部分感兴趣。上图中的订阅者2就是这种情况。最简单的选择就是基于应用程序的过滤逻辑,在消息被消费后,立即丢弃掉。
一些消息代理也提供现成的过滤功能,系统组件可以使用给定的过滤器将自己注册为订阅者。
如果代理可以持久化消息,订阅者就不需要一直保持运行来消费所有数据。每个订阅者在消息代理中都有一个相关联的标记,以便知道最后消费的消息是什么。如果不能在指定的时间点获取消息,稍后数据流会从离开的节点重新传递。
即使所有订阅者检索到特定消息后,也可能想将其存储在消息代理中一段时间。如果新的订阅者想要获得其上线之前的消息,这就很有用了。如果要对某个订阅者进行重置,使所有消息重新得到处理,则持久化指定时间段的所有消息会有所帮助。
在一个将所有操作建模为事件的系统中,可以从中获益。想象一下,误删除了现有数据库中的所有数据。理论上,可以从头开始重播所有事件并重新建立相同的状态。因此,根本不需要在数据库中保留指定实体的最后状态,可将其视为多个事件的聚合,这就是事件追溯的核心概念。
当订阅者的运算不幂等时,可能很危险。
多年来,出现了一些与消息代理有关的消息传递协议和标准:
下面是实现了一些协议和标准或有自己的协议和标准的流行软件工具:
任何情况下,如果需要在不同的工具中进行抉择,就应该熟悉它们,分析其功能是否能满足需求。在Java和Spring Boot中,RabbitMQ和Kafka是构建事件驱动架构的常用工具。
这里,使用RabbitMQ和AMQP协议,原因是它们提供了多种配置,可了解大多数选项,也可在其他消息传递平台重用这些知识。
前面说过,发布者是系统中向消息代理发布消息的组件或应用程序,消费者(或订阅者)接收并处理这些消息。AMQP还定义了交换(exchange)、队列(queue)和绑定(binding),如图所示:
交换是消息被发送到的实体,根据交换类型和规则定义的逻辑(称为绑定)路由到队列。交换可以是持久化的,即使消息代理重新启动,仍然存在;如果不存在,也是暂时的。
队列是AMQP中存储将被消费的消息的对象。一个队列可能有0个、1个或多个消费者,可以是持久或临时的,要注意,持久化的队列不意味着它的所有消息都是持久的。要使消息在消息代理重启后仍然存在,必须将其作为持久消息发布。
绑定是将发布到交换的消息路由到特定队列的规则,就是将队列绑定到找到的交换。一些交换支持通过一个可选的绑定键(binding key)来决定哪些发布到交换的消息最终应该到达指定的队列。从这个意义上,可以将绑定看作过滤器。
发布者可以在发送消息时指定路由键(routing key),如果使用这些配置,可以基于绑定键对其进行正确筛选。路由键由点分隔的单词组成,例如attempts.correct,绑定键也有类似的格式,还可能包含模式匹配器,具体取决于交换的类型。
RabbitMQ有几种交换类型可选,如图所示,通过绑定键定义的不同路由策略和每条消息对应的路由键进行组合。
前面介绍的发布/订阅和过滤器模式都适用于这些场景。图中的直连交换可能看起来像是工作队列模式,实际上并非如此。在AMQP协议中,负载均衡发生在同一队列的消费者之间,而不是队列之间。
要实现工作队列模式,通常会多次订阅同一队列,如图所示。
AMQP协议为消费者应用程序定义了两种不同的确认模式。由于消费者发送确认后,消息将从队列中删除,因此理解这两种确认模式很重要。
第一个可选项是自动确认,这种策略,消息在发送到应用程序时将被视为已交付。第二个选项称为显式确认,会等待应用程序发送确认信号。相比之下,第二个选项更好,可以确保所有消息都得到处理。
消费者可以读取信息、运行一些业务逻辑、将相关数据持久化,甚至可以在向消息代理发送确认信号之前触发后续的事件。这种场景下,只有当消息已被完全处理时,才会将其从队列中删除。如果消费者在发送信号之前宕机(或出现错误),那么消息代理将尝试把消息传递给另一个消费者。如果没有错误,将保持等待,直到有一个消费者开用。
消费者也可以拒绝消息。假设某个消费者实例由于网络错误而无法访问数据库:消费者可以拒绝该消息,并指定该消息重新回到队列或将其丢弃。请注意,如果导致拒绝消息的错误持续存在了一段时间,且没有其他消费者能够成功地处理它,最终可能陷入无限的“重入队列/拒绝消息”的循环之中。
可以到RabbitMQ的官网去下载(RabbitMQ)。安装之后启动代理即可,这里不再一一介绍。
下面介绍一下Docker下安装RabbitMQ的方法。
使用docker pull rabbitmq:management
命令,推荐使用带有management
版本的。
使用docker volume create rabbitmq-home
命令,创建一个数据卷,专门用于持久化RabbitMQ的所有数据,方便管理。
使用命令
docker run -id --name=rabbitmq
-v rabbitmq-home:/var/lib/rabbitmq
-p 15672:15672 -p 5672:5672
-e RABBITMQ_DEFAULT_USER=admin
-e RABBITMQ_DEFAULT_PASS=123456
rabbitmq:management
这里除了挂载数据卷rabbitmq-home之外,还暴露了两个端口,以及设定了两个环境变量:
这样容器就部署完成了!启动容器后,在浏览器中访问服务器地址:15672即可访问到RabbitMQ的管理界面,用户名和密码即为刚刚指定的环境变量的配置值。
RabbitMQ容器是通过指定环境变量的方式进行配置的,这比修改配置文件便捷得多,还有更多的配置用的环境变量,大家可以参考官方文档。
在docker中安装带有management的版本,即可管理RabbitMQ,在浏览器中输入:http://localhost:15672/,可看到登录界面,使用自己设置的用户名和密码登录即可进入管理界面,如下所示:
通过这个界面,可监控队列中的消息、处理速率、不同注册节点的统计信息等,还有其他特性,如对队列和交换的监管,甚至可以创建或删除实体。
前面使用Spring Boot构建微服务,也会使用Spring模块连接到RabbitMQ消息代理。这需要Spring AMQP,它包含两个工件:spring-rabbit,是一组与RabbitMQ代理一起使用的工具;spring-amqp,包括所有AMQP抽象,可使实现独立于供应商。当前,Spring提供AMQP协议的RabbitMQ实现。
Spring Boot为AMQP提供了一个启动程序,带有自动配置:spring-boot-starter-amqp,包含了上面的两个工件。
下图对要构建的功能进行了说明,从微服务Multiplication到客户端的响应可能在Gamification微服务处理消息之前发生,这是一个异步的、最终一致的流程。这里将创建一个Topic类型的Attempts交换,在类似的事件驱动架构中,能灵活地使用特定的路由键发送事件,允许消费者订阅所有事件或在队列中设置自己的过滤器。
从概念上讲,Multiplication微服务拥有Attempts交换,将使用它来发布与用户尝试有关的事件,原则上,将发布正确和错误的事件,因为对消费者的逻辑一无所知。另一方面,Gamification微服务使用满足其要求的绑定键来声明队列,路由键用作过滤器,仅接收正确的尝试。如上图所示,可能有多个Gamification微服务实例从同一队列中消费,那么,代理将平衡所有实例之间的负载。
假设另一种微服务也对ChallengeSolvedEvent感兴趣,该微服务需要声明自己的队列来消费相同的消息。例如,引入Reports微服务,该微服务将创建reports队列并使用绑定键attempt.*(或#)来消费正确和错误的尝试。
可以结合发布/订阅和工作队列模式,以便多个微服务可以处理相同的消息,且同一微服务的多个实例可以共享负载。此外,让发布者负责交换和订阅者负责队列,可构建事件驱动的微服务架构,通过引入消息代理实现了二者之间的松耦合。
要实现事件驱动架构改变,需要完成以下工作:
在Spring Boot应用程序中添加AMQP和RabbitMQ支持,需要添加相应的启动项,在pom.xml中添加相应的依赖,如下所示:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
该启动项包括了spring-rabbit和spring-amqp,通过spring-boot-autoconfigure,RabbitAutoConfiguration类提供连接RabbitMQ服务的默认配置,方便使用。
RabbitAutoConfiguration类使用在RabbitProperties类中定义的一组属性,可以在application.properties文件中覆盖这些属性。这里可以找到预定义的主机名(localhost)、端口(5672)、用户名(guest)和密码(guest)等。自动配置类为RabbitTemplate对象构建连接工厂和配置重新,可用它们接收消息或发送消息到RabbitMQ。也可以使用抽象接口AmqpTemplate。自动配置还包括一些使用其他机制接收消息的默认配置,如RabbitListener注解等。
在微服务Multiplication在添加了消息代理支持,就可以发布事件,进行配置了。
另外,可删除指向Gamification服务的属性,现在不再需要了,对application.properties修改如下:
# 配置交换
amqp.exchange.attempts=attempts.topic
# 配置日志记录,显式交换、队列、绑定等声明信息
logging.level.org.springframework.amqp.rabbit.core.RabbitAdmin=DEBUG
接着,将Exchange声明添加到AMQP的独立配置文件中,可使用Spring的交换生成器:ExchangeBuilder,在配置中添加代理中声明的主题类型的Bean,配置使用JSON进行序列化的消息转换器。AMQPConfiguration类代码如下:
package cn.zhangjuli.multiplication.configuration;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置RabbitMQ,通过AMQP抽象在应用程序中使用事件
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
@Configuration
public class AMQPConfiguration {
/**
* 配置主题交换
* @param exchangeName 交换名称,从配置中获取
* @return TopicExchange,使用配置的交换名称构建主题交换
*/
@Bean
public TopicExchange challengesTopicExchange(
@Value("${amqp.exchange.attempts}") final String exchangeName
) {
return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();
}
/**
* 配置消息转换器,用JSON对象序列化程序覆盖默认的Java对象序列化程序,以避免Java对象序列化的缺陷。
* @return 消息转换器
*/
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
用JSON对象序列化程序覆盖默认的Java对象序列化程序,可以避免Java对象序列化的缺陷。
发布者端,Jackson2JsonMessageConverter 使用预先配置的ObjectMapper,然后,RabbitTemple实现将使用这个Bean,该类将序列化对象并将其作为AMQP消息发送给代理。订阅者端,可从JSON格式中受益,从而可使用任何编程语言对内容进行反序列化,还可以使用自己的对象表示形式,忽略消费者端不需要的属性,从而减少微服务之间的耦合。如果发布者在有效载荷中包括新字段,订阅者也不必更改任何内容。
JSON不是Spring AMQP消息转换器支持的唯一格式,还可以使用XML或Google的协议缓冲区(即protobuf)。在性能至关重要的实际系统中,应考虑二进制格式(如protobuf)。
下面,删除GamificationServiceClient类,同时需要删除ChallengeServiceImpl类中的相关引用(只有有限的几行)。然后,将ChallengeSolvedDTO重命名为ChallengeSolvedEvent,不需要修改任何字段,仅此而已。ChallengeSolvedEvent类如下:
package cn.zhangjuli.multiplication.challenge;
import lombok.Value;
/**
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
@Value
public class ChallengeSolvedEvent {
long attemptId;
boolean correct;
int factorA;
int factorB;
long userId;
String userAlias;
}
注意:使用显式的命名约定是一种良好的实践。
下面,在服务层创建一个新组件以发布事件,等效于已经删除的REST API客户端,但它与消息代理进行通信。ChallengeEventPublisher 类的代码如下:
package cn.zhangjuli.multiplication.challenge;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
@Service
public class ChallengeEventPublisher {
private final AmqpTemplate amqpTemplate;
private final String challengesTopicExchange;
public ChallengeEventPublisher(AmqpTemplate amqpTemplate,
@Value("${amqp.exchange.attempts}")
String challengesTopicExchange) {
this.amqpTemplate = amqpTemplate;
this.challengesTopicExchange = challengesTopicExchange;
}
/**
* 发布事件,设置路由键,然后发送到代理。
* @param challengeAttempt 消息源,需要转换为发布的事件。
*/
public void challengesSolved(final ChallengeAttempt challengeAttempt) {
ChallengeSolvedEvent event = buildEvent(challengeAttempt);
// 路由键是 attempt.correct 或 attempt.wrong
String routingKey = "attempt." + (event.isCorrect() ? "correct" : "wrong");
amqpTemplate.convertAndSend(challengesTopicExchange, routingKey, event);
}
/**
* 将ChallengeAttempt转换为ChallengeSolvedEvent 对象
* @param challengeAttempt 消息源
* @return 事件,要发布的数据。
*/
private ChallengeSolvedEvent buildEvent(final ChallengeAttempt challengeAttempt) {
return new ChallengeSolvedEvent(challengeAttempt.getId(),
challengeAttempt.isCorrect(), challengeAttempt.getFactorA(),
challengeAttempt.getFactorB(), challengeAttempt.getUser().getId(),
challengeAttempt.getUser().getAlias());
}
}
还记得在删除GamificationServiceClient类时,要在ChallengeServiceImpl类中删除的引用吗?这是关联点,现在要使用ChallengeEventPublisher替换GamificationServiceClient功能,即可将消息发布到RabbitMQ代理上,这样,感兴趣的任何组件就可以使用这些消息了。修改ChallengeServiceImpl类以发送新事件,代码如下:
package cn.zhangjuli.multiplication.challenge;
import cn.zhangjuli.multiplication.user.User;
import cn.zhangjuli.multiplication.user.UserRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ChallengeServiceImpl implements ChallengeService {
private final UserRepository userRepository;
private final ChallengeAttemptRepository attemptRepository;
private final ChallengeEventPublisher challengeEventPublisher;
@Override
public ChallengeAttempt verifyAttempt(ChallengeAttemptDTO attemptDTO) {
// Check if the attempt is correct
boolean isCorrect =
attemptDTO.getGuess() == attemptDTO.getFactorA() * attemptDTO.getFactorB();
// 检查alias用户是否存在,不存在就创建
User user = userRepository.findByAlias(attemptDTO.getUserAlias())
.orElseGet(() -> {
log.info("Creating new user with alias {}", attemptDTO.getUserAlias());
return userRepository.save(
new User(attemptDTO.getUserAlias())
);
});
// Builds the domain object. Null id for now.
ChallengeAttempt checkedAttempt = new ChallengeAttempt(null,
user,
attemptDTO.getFactorA(),
attemptDTO.getFactorB(),
attemptDTO.getGuess(),
isCorrect);
// Stores the attempt
ChallengeAttempt storedAttempt = attemptRepository.save(checkedAttempt);
log.info("attempt: {}", storedAttempt);
// 发布事件来通知潜在的感兴趣的订阅者
challengeEventPublisher.challengesSolved(storedAttempt);
return storedAttempt;
}
@Override
public List<ChallengeAttempt> getStatisticsForUser(final String userAlias) {
return attemptRepository.findTop10ByUserAliasOrderByIdDesc(userAlias);
}
}
同样地,修改ChallengeServiceTest 类以测试发送新事件,代码如下:
package cn.zhangjuli.multiplication.challenge;
import cn.zhangjuli.multiplication.user.User;
import cn.zhangjuli.multiplication.user.UserRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.BDDAssertions.then;
import static org.mockito.AdditionalAnswers.returnsFirstArg;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
/**
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
@ExtendWith(MockitoExtension.class)
public class ChallengeServiceTest {
private ChallengeService challengeService;
// 使用Mockito进行模拟
@Mock
private UserRepository userRepository;
@Mock
private ChallengeAttemptRepository attemptRepository;
@Mock
private ChallengeEventPublisher challengeEventPublisher;
@BeforeEach
public void setUp() {
challengeService = new ChallengeServiceImpl(
userRepository,
attemptRepository,
challengeEventPublisher
);
}
@Test
public void checkCorrectAttemptTest() {
// given
// 这里希望save方法什么都不做,只返回第一个(也是唯一一个)传递的参数,这样不必调用真实的存储库即可测试该层。
given(attemptRepository.save(any()))
.will(returnsFirstArg());
ChallengeAttemptDTO attemptDTO = new ChallengeAttemptDTO(50, 60, "noise", 3000);
// when
ChallengeAttempt resultAttempt = challengeService.verifyAttempt(attemptDTO);
// then
then(resultAttempt.isCorrect()).isTrue();
verify(attemptRepository).save(resultAttempt);
verify(challengeEventPublisher).challengesSolved(resultAttempt);
}
@Test
public void checkWrongAttemptTest() {
// given
given(attemptRepository.save(any()))
.will(returnsFirstArg());
ChallengeAttemptDTO attemptDTO = new ChallengeAttemptDTO(50, 60, "noise", 5000);
// when
ChallengeAttempt resultAttempt = challengeService.verifyAttempt(attemptDTO);
// then
then(resultAttempt.isCorrect()).isFalse();
verify(userRepository).save(new User("noise"));
verify(attemptRepository).save(resultAttempt);
verify(challengeEventPublisher).challengesSolved(resultAttempt);
}
@Test
public void checkExistingUserTest() {
// given
given(attemptRepository.save(any()))
.will(returnsFirstArg());
User existingUser = new User(1L, "john_doe");
given(userRepository.findByAlias("john_doe"))
.willReturn(Optional.of(existingUser));
ChallengeAttemptDTO attemptDTO = new ChallengeAttemptDTO(50, 60, "john_doe", 5000);
// when
ChallengeAttempt resultAttempt = challengeService.verifyAttempt(attemptDTO);
// then
then(resultAttempt.isCorrect()).isFalse();
then(resultAttempt.getUser()).isEqualTo(existingUser);
verify(userRepository, never()).save(any());
verify(attemptRepository).save(resultAttempt);
verify(challengeEventPublisher).challengesSolved(resultAttempt);
}
@Test
public void retrieveStatisticsTest() {
// given
User user = new User("john_doe");
ChallengeAttempt attempt1 = new ChallengeAttempt(1L, user, 50, 60, 3010, false);
ChallengeAttempt attempt2 = new ChallengeAttempt(2L, user, 50, 60, 3051, false);
List<ChallengeAttempt> lastAttempts = List.of(attempt1, attempt2);
given(attemptRepository.findTop10ByUserAliasOrderByIdDesc("john_doe"))
.willReturn(lastAttempts);
// when
List<ChallengeAttempt> latestAttemptsResult = challengeService.getStatisticsForUser("john_doe");
// then
then(latestAttemptsResult).isEqualTo(lastAttempts);
}
}
在ChallengeServiceTest 类中,并没有模拟AmqpTemplate的行为。需要一个新的ChallengeEventPublisherTest类,使用Mockit的ArgumentCaptor类捕获传递给Mock的参数,来模拟AmqpTemplate的路由键和事件对象调用。代码如下:
package cn.zhangjuli.multiplication.challenge;
import cn.zhangjuli.multiplication.user.User;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.amqp.core.AmqpTemplate;
import static org.assertj.core.api.BDDAssertions.then;
import static org.mockito.Mockito.verify;
/**
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
@ExtendWith(MockitoExtension.class)
public class ChallengeEventPublisherTest {
private ChallengeEventPublisher challengeEventPublisher;
@Mock
private AmqpTemplate amqpTemplate;
@BeforeEach
public void setUp() {
challengeEventPublisher = new ChallengeEventPublisher(amqpTemplate, "test.topic");
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void sendsAttempt(boolean correct) {
// given
ChallengeAttempt challengeAttempt = createTestChallengeAttempt(correct);
// when
challengeEventPublisher.challengesSolved(challengeAttempt);
// then
var exchangeCaptor = ArgumentCaptor.forClass(String.class);
var routingKeyCaptor = ArgumentCaptor.forClass(String.class);
var eventCaptor = ArgumentCaptor.forClass(ChallengeSolvedEvent.class);
verify(amqpTemplate).convertAndSend(exchangeCaptor.capture(),
routingKeyCaptor.capture(), eventCaptor.capture());
then(exchangeCaptor.getValue()).isEqualTo("test.topic");
then(routingKeyCaptor.getValue()).isEqualTo("attempt." +
(correct ? "correct" : "wrong"));
then(eventCaptor.getValue()).isEqualTo(solvedEvent(correct));
}
private ChallengeSolvedEvent solvedEvent(boolean correct) {
return new ChallengeSolvedEvent(1L, correct, 30, 40, 10L, "john_doe");
}
private ChallengeAttempt createTestChallengeAttempt(boolean correct) {
return new ChallengeAttempt(1L,
new User(10L, "john_doe"),
30,
40,
correct ? 1200 : 1300,
correct);
}
}
已经在Multiplication微服务中发布了事件,那么Gamification微服务中怎么订阅事件呢?和Multiplication类似,需要改变事件的使用方式,要替换现有的接受事件订阅者的控制器。
首先,在微服务Gamification的pom.xml中添加消息代理支持,就可以订阅事件,进行配置了。
在application.properties配置这些信息,修改如下:
# 交换名称
amqp.exchange.attempts=attempts.topic
# 队列名称
amqp.queue.gamification=gamification.queue
# 配置日志记录,显式交换、队列、绑定等声明信息
logging.level.org.springframework.amqp.rabbit.core.RabbitAdmin=DEBUG
实际上,生产者将消息发送到 Exchange (交换器:“X”),由交换器将消息路由到一个或者多个队列中,消费者再去消费消息。如果路由不到,或许会返回给生产者,或许直接丢弃。要明白两个概念:
注意,消费端也应该声明交换,当声明队列时,交换必须存在。交换使用持久化,这仅在第一次声明时适用。但在微服务中声明所需的交换和队列是一种良好的编程习惯。另外,RabbitMQ实体的声明是幂等操作;如果实体存在,则该操作无效。
同样,需要在AMQPConfiguration类中进行配置,以使用JSON进行序列化的消息转换器,而不是默认消息转换器提供的格式。AMQPConfiguration类代码如下:
package cn.zhangjuli.gamification.configuration;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
/**
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
@Configuration
public class AMQPConfiguration {
/**
* 声明交换
* @param exchangeName 交换名
* @return 主题交换
*/
@Bean
public TopicExchange challengesTopicExchange(
@Value("${amqp.exchange.attempts}") final String exchangeName
) {
return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();
}
/**
* 声明队列
* @param queueName 队列名
* @return 队列
*/
@Bean
public Queue gamificationQueue(
@Value("${amqp.queue.gamification}") final String queueName
) {
return QueueBuilder.durable(queueName).build();
}
/**
* 声明绑定,绑定队列到交换器上
* @param gamificationQueue 队列
* @param attemptsExchange 交换器
* @return 绑定
*/
@Bean
public Binding correctAttemptsBuilding(
final Queue gamificationQueue,
final TopicExchange attemptsExchange
) {
return BindingBuilder.bind(gamificationQueue)
.to(attemptsExchange)
.with("attempt.correct");
}
/**
* 设置MessageHandlerMethodFactory替换默认Bean,使用默认工厂为基准,将其消息转换器替换为
* MappingJackson2MessageConverter实例,处理从JSON到Java类的消息反序列化。并对其包含的
* ObjectMapper进行微调,添加ParameterNamesModule以避免必须为事件类使用空的构造方法。
* @return 设置后的MessageHandlerMethodFactory工厂
*/
@Bean
public MessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
final MappingJackson2MessageConverter jsonConverter = new MappingJackson2MessageConverter();
jsonConverter.getObjectMapper().registerModule(new ParameterNamesModule(JsonCreator.Mode.PROPERTIES));
factory.setMessageConverter(jsonConverter);
return factory;
}
/**
* 设置RabbitListenerConfigurer,以使监听器使用JSON反序列化,使用MessageHandlerMethodFactory的
* 实现来覆盖RabbitListenerConfigurer。
* @param messageHandlerMethodFactory 工厂
* @return RabbitListenerConfigurer
*/
@Bean
public RabbitListenerConfigurer rabbitListenerConfigurer(
final MessageHandlerMethodFactory messageHandlerMethodFactory
) {
return (c) -> c.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
}
}
这里不使用AmqpTemplate来接收消息,因为这是基于轮询的,会消耗网络资源;而想使用代理在有消息时通知订阅者,会使用异步通信,AMQP抽象不支持这些功能,spring-rabbit则提供了异步消费消息的机制。
下面将ChallengeSolvedDTO重构为ChallengeSolvedEvent,从技术上讲,不需要改名,因为JSON格式仅指定字段名和值,但应养成良好的编程习惯,使用合适的名称来命名,更容易找到相关的类。ChallengeSolvedEvent代码如下:
package cn.zhangjuli.gamification.challenge;
import lombok.Value;
/**
* 定义了微服务Multiplication和微服务Gamification之间的契约,为保持独立性,在两个项目中都需要创建。
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
// 表明该类是不可变类
@Value
public class ChallengeSolvedEvent {
long attemptId;
boolean correct;
int factorA;
int factorB;
long userId;
String userAlias;
}
遵循领域驱动的设计实践,可以调整此事件的反序列化字段。例如,不需要userAlias作为Gamification的业务逻辑,可以将其从消费事件中删除。因为Spring Boot默认情况下将ObjectMapper配置为忽略未知属性,所以无需其他配置即可工作。
最好不要在微服务之间共享代码,因为要支持松耦合、向后兼容和独立部署。想象一下,微服务Multiplication将进一步发展并存储额外的数据,假设这是更艰巨挑战的额外因素。然后,这些额外因素将被添加到已发布事件的代码中,好的一面是,通过在每个领域中使用事件的不同表示形式并将映射器配置为忽略未知属性,Gamification微服务仍然有效,而不必更新其事件表示形式。
现在,就来消费事件,使用@RabbitListener注解,使其在消息到达时充当消息的处理逻辑。这里,只需要指定要订阅的队列名称,因为已经在单独的配置类中声明了RabbitMQ实体。GameEventHandler 类代码如下:
package cn.zhangjuli.gamification.game;
import cn.zhangjuli.gamification.challenge.ChallengeSolvedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class GameEventHandler {
private final GameService gameService;
/**
* 实现RabbitMQ订阅者代码很少,可将队列传递给@RabbitListener注解。
* 以ChallengeSolvedEvent对象为期望的输入,Spring自动配置一个反序列化器,将消息从代理转换为该对象类型。
* 因为AMQPConfiguration中的配置,将使用JSON。
* 默认情况下,当方法最终正常完成时,基于@RabbitListener注解构建的逻辑会将确认发送给代理,
* 在Spring Rabbit中,称为AUTO确认模式。
* 如果想在处理之前就发送ACK信号,可将其更改为NONE;如果想完全控制,则可将其修改为MANUAL。
* 可在工厂级别(全局配置)或监听器级别(通过将额外的参数传递给@RabbitListener注解)设置此参数和其他配置值。
* 这里使用默认的错误策略:AUTO,捕获任何可能的异常,记录错误,然后重新抛出AmqpRejectAndDontRequeueException。
* 这是Spring AMQP的快捷方式,用于拒绝该消息并告诉代理不要重新排队,
* 这意味着,如果消费者逻辑中出现意外错误,将丢失消息。这里是可以接受的。
* 如果要避免这种情况,还可以设置代码以多次重试,方法是重新抛出InstantRequeueAmqpException,
* 或使用Spring AMQP中提供的一些工具(如错误处理程序或邮件恢复程序)来处理这些无效的消息。
* @param event 期望的输入,事件
*/
@RabbitListener(queues = "${amqp.queue.gamification}")
void handleMultiplicationSolved(final ChallengeSolvedEvent event) {
log.info("已接收到成功挑战事件:{}", event.getAttemptId());
try {
gameService.newAttemptForUser(event);
} catch (final Exception e) {
log.error("在处理ChallengeSolvedEvent时发生错误:", e);
// 避免事件重新排队和处理
throw new AmqpRejectAndDontRequeueException(e);
}
}
}
可使用@RabbitListener注解做很多事:
启动应用程序,在RabbitMQ中发布消息,就可以在控制台接收到消息,如图所示:
修改了订阅者的逻辑,就可以删除GameController类了。需要重构GameService的接口及其实现GameServiceImpl,来处理ChallengeSolvedEvent,基本逻辑不变。代码如下:
package cn.zhangjuli.gamification.game;
import cn.zhangjuli.gamification.challenge.ChallengeSolvedEvent;
import cn.zhangjuli.gamification.game.badgeprocessors.BadgeProcessor;
import cn.zhangjuli.gamification.game.domain.BadgeCard;
import cn.zhangjuli.gamification.game.domain.BadgeType;
import cn.zhangjuli.gamification.game.domain.ScoreCard;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class GameServiceImpl implements GameService {
private final ScoreRepository scoreRepository;
private final BadgeRepository badgeRepository;
private final List<BadgeProcessor> badgeProcessorList;
@Override
public GameResult newAttemptForUser(ChallengeSolvedEvent challenge) {
if (challenge.isCorrect()) {
ScoreCard scoreCard = new ScoreCard(challenge.getUserId(), challenge.getAttemptId());
scoreRepository.save(scoreCard);
log.info("用户 {} 的尝试 {} 得分 {}",
challenge.getUserAlias(), challenge.getAttemptId(), scoreCard.getScore());
List<BadgeCard> badgeCards = processForBadges(challenge);
return new GameResult(scoreCard.getScore(),
badgeCards.stream().map(BadgeCard::getBadgeType).collect(Collectors.toList()));
} else {
log.info("尝试 {} 不正确。用户 {} 没有得分。",
challenge.getAttemptId(), challenge.getUserAlias());
return new GameResult(0, List.of());
}
}
/**
* 检查总分和不同的得分来得到徽章
* @param challengeSolved 新的尝试
* @return 徽章的列表
*/
private List<BadgeCard> processForBadges(final ChallengeSolvedEvent challengeSolved) {
Optional<Integer> optionalTotalScore =
scoreRepository.getTotalScoreForUser(challengeSolved.getUserId());
if (optionalTotalScore.isEmpty()) {
return Collections.emptyList();
}
int totalScore = optionalTotalScore.get();
// 得到用户的总分和徽章
List<ScoreCard> scoreCardList =
scoreRepository.findByUserIdOrderByScoreTimestampDesc(challengeSolved.getUserId());
Set<BadgeType> alreadyGetBadges =
badgeRepository.findByUserIdOrderByBadgeTimestampDesc(challengeSolved.getUserId())
.stream()
.map(BadgeCard::getBadgeType)
.collect(Collectors.toSet());
// 调用徽章处理器来处理还没有得到的徽章
List<BadgeCard> newBadgeCards = badgeProcessorList.stream()
.filter(badgeProcessor -> !alreadyGetBadges.contains(badgeProcessor.badgeType()))
.map(badgeProcessor -> badgeProcessor.processForOptionalBadge(totalScore,
scoreCardList, challengeSolved))
.flatMap(Optional::stream)
.map(badgeType -> new BadgeCard(challengeSolved.getUserId(), badgeType))
.collect(Collectors.toList());
badgeRepository.saveAll(newBadgeCards);
return newBadgeCards;
}
}
注意,将ChallengeSolvedDTO修改为ChallengeSolvedEvent,会影响多个类,使用IDEA环境将自动进行重构,如果不能,需要人工干预。
至此,完成了如下工作:
因为没有改变开放的前端交互的API,所以,前端界面不需要改变。
现在,系统已经实现了事件驱动的架构,就来看看使用消息代理带来的优势。系统基本逻辑如图所示。
下面就启动系统,来验证整个系统逻辑:
先来看看Gamification微服务的启动日志,会看到下面一些与RabbitMQ有关的信息,如下所示:
2023-12-10T09:10:56.980+08:00 INFO 32272 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-12-10T09:10:57.010+08:00 INFO 32272 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#4603845b:0/SimpleConnection@383c94ed [delegate=amqp://admin@127.0.0.1:5672/, localPort=53842]
2023-12-10T09:10:57.013+08:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
2023-12-10T09:10:57.023+08:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'attempts.topic'
2023-12-10T09:10:57.025+08:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'gamification.queue'
2023-12-10T09:10:57.027+08:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [gamification.queue (QUEUE)] to exchange [attempts.topic] with routing key [attempt.correct]
2023-12-10T09:10:57.029+08:00 DEBUG 32272 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Declarations finished
使用Spring AMQP时,会记录前面两行,表明已成功连接到代理。另外的日志信息,是因为开启了RabbitAdmin类的日志记录级别为DEBUG。
Multiplication微服务端,还没有RabbitMQ日志,原因是连接和交换的声明仅在发布第一条消息时发生。当前端界面有重试到来时,就启动连接,控制台日志就会显示相关日志。如下所示:
2023-12-09T15:38:48.050+08:00 INFO 59044 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-12-09T15:38:48.081+08:00 INFO 59044 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#15f229e8:0/SimpleConnection@3da194a4 [delegate=amqp://guest@127.0.0.1:5672/, localPort=65533]
2023-12-09T15:38:48.083+08:00 DEBUG 59044 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
2023-12-09T15:38:48.095+08:00 DEBUG 59044 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'attempts.topic'
2023-12-09T15:38:48.099+08:00 DEBUG 59044 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Declarations finished
可以通过RabbitMQ管理界面,了解当前状态。在Connections选项卡中,可看到微服务创建的连接,如图所示:
切换到Exchanges选项卡,会看到topic类型的attempts.topic交换,声明为durable(D),如图所示:
单击交换名称进入详细信息页面,可以看到相关信息,可以绑定队列和设置路由键,还可以发布消息,如图所示:
Queues选项卡显示了队列,该队列也配置为durable(D)。如图所示:
Channels选项卡显示了通道信息,如图所示:
了解了基本情况后,就来看看发送重试信息后的响应,使用HTTPie产生正确的尝试挑战,如下所示:
> http POST :8080/attempts factorA=50 factorB=60 userAlias=noise guess=3000
在Multiplication日志中,可以看到如何连接代理并声明交换的(第一次可见,如前面的日志所示,这里已经存在,就无效了),可以在控制台看到发布消息的输出,如下所示:
2023-12-10T09:49:49.270+08:00 INFO 59044 --- [nio-8080-exec-5] c.z.m.challenge.ChallengeServiceImpl : attempt: ChallengeAttempt(id=1756, user=User(id=1, alias=noise), factorA=50, factorB=60, resultAttempt=3000, correct=true)
Gamification微服务将反映事件的消费情况并更新分数,日志如下:
2023-12-10T09:49:49.304+08:00 INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件:1756
2023-12-10T09:49:49.351+08:00 INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise 的尝试 1756 得分 10
2023-12-10T10:02:25.740+08:00 INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件:1757
2023-12-10T10:02:25.741+08:00 INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise 的尝试 1757 得分 10
多发布几次消息,可以从RabbitMQ管理界面中,从Queues选项卡,单击想要查看的队列名称,即可看到队列的整体情况,如图所示:
当然,也可以使用前端界面来进行尝试,如图所示:
前面的微服务实现具有弹性,即使Gamification微服务不可用,也不会失效。但是会错过这段时间发送的成功尝试,现在,引入了消息代理之后呢?
停止Gamification微服务,使用HTTPie发送尝试,看看如何:
> http POST :8080/attempts factorA=50 factorB=60 userAlias=noise1 guess=3000
RabbitMQ的“队列详细信息”页面将显示排队消息,如图所示:
这些消息仍然存在,但没有消费者消费。检查Multiplication微服务日志,也没有错误。它将消息发布到代理,并向API客户端返回OK响应,实现了松耦合,Multiplication不需要知道消费者是否可用。整个过程是异步的,基于事件驱动的。
再次回到Gamification微服务,启动后将在日志中看到它会从代理接收所有事件消息。然后,会触发其业务逻辑,更新分数。现在就没有丢失任何数据。如下所示:
2023-12-10T11:00:59.801+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件:1781
2023-12-10T11:00:59.861+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1781 得分 10
2023-12-10T11:00:59.896+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件:1782
2023-12-10T11:00:59.898+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1782 得分 10
2023-12-10T11:00:59.905+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件:1783
2023-12-10T11:00:59.909+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1783 得分 10
2023-12-10T11:00:59.916+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件:1784
2023-12-10T11:00:59.918+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1784 得分 10
2023-12-10T11:00:59.924+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件:1785
2023-12-10T11:00:59.926+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1785 得分 10
2023-12-10T11:00:59.931+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件:1786
2023-12-10T11:00:59.931+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1786 得分 10
2023-12-10T11:00:59.938+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler : 已接收到成功挑战事件:1787
2023-12-10T11:00:59.939+08:00 INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl : 用户 noise1 的尝试 1787 得分 10
还可用用更新的分数来验证排行榜。系统不仅有弹性,还可用在故障后恢复。RabbitMQ界面的队列详细信息也显示了排队消息的计数为0,因为已经消费完了。
想象一下,如果RabbitMQ可以在丢弃消息之前配置将消息保留在队列中的时间(生存时间,TTL),还可以配置队列的最大长度,就可以根据用户需要进行调整了。
例如,可以将前面Queue配置进行设置,来提供相关的配置,可以配置队列使其具有6小时TTL和最大长度为25000条消息,代码如下:
@Bean
public Queue gamificationQueue(
@Value("${amqp.queue.gamification}") final String queueName
) {
return QueueBuilder.durable(queueName)
.ttl((int) Duration.ofHours(6).toMillis())
.maxLength(25000L)
.build();
}
在队列有待传递的消息时关闭代理呢?要测试这种情况,按照如下步骤操作:
和前面一样,手动测试后,代理关闭前,发送的尝试被RabbitMQ放入队列,但没有得到处理。停止RabbitMQ后,再次执行测试,将获得HTTP错误响应,日志如下:
2023-12-10T15:07:21.148+08:00 INFO 59044 --- [nio-8080-exec-6] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-12-10T15:07:21.153+08:00 ERROR 59044 --- [nio-8080-exec-6] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information] with root cause
java.net.ConnectException: Connection refused: no further information
//...其余内容省略
可添加一个try/catch子句,来抑制错误,更好的方法是实现自定义HTTP错误处理程序,以返回特定的错误响应,如:503服务不可达,指示与代理断开时系统无法运行,可有多种选择。
再次启动Gamification微服务后,仍然可以运行,但会不断重试连接,日志如下:
2023-12-10T15:14:05.528+08:00 WARN 21044 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information
2023-12-10T15:14:05.528+08:00 INFO 21044 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@52903e9b: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2023-12-10T15:14:05.529+08:00 INFO 21044 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-12-10T15:14:05.530+08:00 ERROR 21044 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
2023-12-10T15:14:05.530+08:00 INFO 21044 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
而且,新的尝试请求到来时,Multiplication微服务也会执行相同的操作。再次启动RabbitMQ代理时,两个微服务恢复连接。当Gamification重新连接到RabbitMQ后,就会收到排队的事件,进行后续的处理。这是因为声明了持久化的交换和队列,而且Spring实现在发布消息时使用了持久化传递模式。如果使用RabbitTemplate(而不是AmqpTemplate)来发布消息,可以自己设置消息的属性,将消息传递模式修改为非持久化示例如下:
MessageProperties properties = MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
rabbitTemplate.getMessageConverter().toMessage(challengeAttempt, properties);
rabbitTemplate.convertAndSend(challengesTopicExchange, routingKey, event);
这个例子告诉大家应该了解所使用的工具的配置项。以持久性方式发送所有消息带来了不错的优势,但会带来额外的性能开销。如果配置正确分布的RabbitMQ集群,宕机的可能性会很小,也可能愿意接受潜在的消息丢失以提高性能,这取决于需求。
前面的测试暴露了一个意外情况,代理关闭后发送尝试,将收到服务器错误,错误代码为500,给客户端造成了一种印象,未正常处理该消息,实际上已经处理了一部分了。
停止代理,运行Multiplication,再次发送尝试,将得到错误响应,例如:
> http POST :8080/attempts factorA=50 factorB=60 userAlias=noise3 guess=3000
HTTP/1.1 500
Connection: close
Content-Type: application/json
Date: Sun, 10 Dec 2023 07:44:59 GMT
Transfer-Encoding: chunked
Vary: Origin, Access-Control-Request-Method, Access-Control-Request-Headers
{
"error": "Internal Server Error",
"path": "/attempts",
"status": 500,
"timestamp": "2023-12-10T07:44:59.980+00:00"
}
但是,查看数据库,进入Multiplication数据库的H2控制台,查看ChallengeAttempt数据表,可以看到最后添加了1行尝试,这意味着,即使收到了错误响应,该信息也已经存储了。如果代码在尝试将消息发送给代理之前持久化对象,这就无法避免。
可将服务方法verifyAttempt中包含的整个逻辑视为一个事务,可回滚数据库事务(不执行)。即使在调用存储库中的save方法之后仍然出现错误,仍然可以回滚。使用Spring框架来实现该操作很容易,只需要将@Transactional注解添加到方法上即可,代码如下:
@Transactional
@Override
public ChallengeAttempt verifyAttempt(ChallengeAttemptDTO attemptDTO) {
// ...
}
如果在这段代码中存在异常,则事务将回滚。如果要求服务中的所有方法都具有事务性,可以在类级别添加该注解。
导致 @Transactional 失效的常见场景有以下 5 个:
修改后重试相同的步骤:重启Multiplication微服务,关闭代理发送尝试,查看数据库存储,发现没有存储。由于抛出异常,Spring回滚数据库操作,就像未执行过。
Spring还支持发布者和订阅者双方的RabbitMQ事务,在@Transactional注解的方法范围内使用AmqpTemplate或RabbitTemplate实现发送消息,当在通道(RabbitMQ)中启动事务性时,即使在发送这些消息的方法调用之后发生异常,这些消息也不会到达代理。在消费端,也可以使用事务拒绝已处理的消息。这种情况下,需要设置队列以重新排列被拒绝的消息(这是默认情况)。
类似情况下,可简化事务处理策略,将其限制在数据库中:
微服务内的本地事务性对保持数据一致性并避免部分流程完成至关重要,当业务逻辑涉及多个步骤并可能与数据库或消息代理之类的外部组件交互时,应该考虑到业务逻辑可能出错。
微服务架构的优势之一是可以独立扩展系统的各个部分,在消息代理中引入事件驱动方法的好处,可以透明地添加更多的发布者和订阅者,目前为止一直在运行微服务的单个实例,不能确定架构还支持为每个微服务添加更多的副本。
不能使用多个微服务实例的第一个原因是:数据库。当水平扩展微服务时,所有副本都应共享数据层,并将数据存储在一个公共位置,而不是每个实例都隔离。微服务必须是无状态的,原因是不同的请求或消息可能最终出现在不同的微服务实例中。例如,不应该假定同一Multiplication实例将处理来自同一用户发送的两次尝试,因此,不能在尝试中保持任何内存状态,如图所示:
好的一面是,微服务是无状态的,可以独立处理每个请求或消息,结果最终存储在数据库中。但有一个技术问题,如果在端口9080上启动第二个实例,将无法启动,因为它试图创建新的数据库实例。
下面就来看看这个问题,先正常运行应该8080端口的Multiplication实例。然后,启动第二个实例,覆盖server.port参数为9080,以避免端口冲突。可执行如下命令:
./mvnw spring-boot:run -D"spring-boot.run.arguments"="--server.port=9080"
当启动第二个实例时,日志会提示错误:
org.h2.jdbc.JdbcSQLNonTransientConnectionException: Database may be already in use: "~/multiplication.mv.db". Possible solutions: close all other connection(s); use the server mode [90020-214]
发生错误的原因是H2数据库引擎,该引擎默认情况下被设计成嵌入式进程,而不是服务器。需要改变的是使用H2数据库服务器的模式,以便可以使用多个实例进行连接,在application.properties中添加配置如下:
spring.datasource.url=jdbc:h2:file:~/multiplication1;DB_CLOSE_ON_EXIT=FALSE;AUTO_SERVER=true;
注意,AUTO_SAVE=true可能不起作用,可以换成MySQL来试试。
第二个挑战是负载均衡。如果启动多个实例,如何从用户界面连接到它们?下面看看消息代理是如何实现RabbitMQ消息订阅者之间的负载均衡的。如图所示:
RabbitMQ支持多种来源的消息发布,这意味着可以有多个Multiplication微服务副本将事件发布到同一主题交换,这是透明的,这些实例打开不同的连接,声明交换(仅在第一次创建),然后发布数据而不必知道还有其他发布者。在订阅者端,由多个消费者共享,当启动多个Gamification微服务实例时,所有实例都声明相同的队列和绑定,代理足够智能,可在它们之间进行负载均衡。这就在消息级别解决了负载均衡的问题。
启动每个微服务的实例、前端和RabbitMQ服务。在两个单独的终端运行另外的两个微服务实例,让Multiplication和Gamification都有两个副本,注意端口不同:
> ./mvnw spring-boot:run -D"spring-boot.run.arguments"="--server.port=9080"
> ./mvnw spring-boot:run -D"spring-boot.run.arguments"="--server.port=9081"
一旦启动并运行了所有实例,在前端重试尝试,这些尝试会使用Multiplication实例1的服务,但事件消费在两个Gamification副本之间平衡。检查日志可以验证Gamification实例如何处理事件的。另外,由于数据库在各个实例之间共享,因此前端可以从运行的端口8081的实例请求排行榜,此实例将聚合所有副本存储的得分和徽章。如图所示:
还可以验证是否有多个发布者一起使用命令行将正确的尝试发送到Multiplication微服务的第二个实例,使用命令行向位于端口9080的实例发送调用,并检查,可以看到,消息在各个订阅者之间是平衡的,命令行如下:
> http POST :8080/attempts factorA=50 factorB=60 userAlias=noise6 guess=3000
现在,通过消息代理可以实现良好的系统可伸缩性,在多个订阅者之间实现负载均衡,也提高了系统的弹性。停止一个Gamification实例,代理会自动将消息定向到另一个实例。经过测试可以发现整个系统仍然能够正常工作。这里的问题是,前端无法进行负载均衡,或检测到一个副本已关闭。
文章介绍了事件驱动架构的实现过程,了解了如何通过消息代理实现微服务之间的松耦合,事件模式不针对特定目标,仅表示在特定领域中发生的事实,通过对不同的消息类型进行建模,从而实现松耦合,结合RabbitMQ和AMQP提供的方案,实现消息发布和交换、订阅消息的队列和路由绑定,配置相关参数,适应功能性和非功能性需求。使用Spring Boot实现AMQP和RabbitMQ的集成,很容易构建事件驱动架构的应用程序。通过示例,了解事件驱动架构的使用场景,实现了良好的系统可伸缩性,很容易在多个订阅者之间实现负载均衡,也提高了系统的弹性。不同的工具,可能有不同的利弊,应根据需要来选择。
前面的文章:
1、1 一个测试驱动的Spring Boot应用程序开发
2、2 使用React构造前端应用
3、3 试驱动的Spring Boot应用程序开发数据层示例
4、4 向微服务架构转变
后面的文章:
6、6 使用网关模式进行负载均衡