我们在学习多线程的时候,学习过生产者-消费者模型,为了实现解耦合和削峰填谷,引入了阻塞队列.
在实际的后端开发中,跨主机之间使用生产者消费者模型,也是非常普遍的需求,因此,阻塞队列会被封装成一个独立的服务器程序,实现更丰富的功能.这样的程序称为"消息队列" .
市面上成熟的消息队列非常多,有Kafka,RabbitMQ,RocketMQ,ActiveMQ...接下来我们要实现的MQ就是以RabbitMQ为蓝图的.
在实现MQ之前,我们先来认识几个核心概念.
1. 生产者(Producer) : 负责生产消息
2. 消费者(Consumer) : 负责消费消息
3. 中间人(Broker) :?消息队列
4. 发布(Publish) : 生产者生产消息并存放到消息队列中
5. 订阅(Subscribe) : 消费者预订某个Queue的消息,不是把消息从Broker中取出来,也不是消费消息
1. 虚拟机(VirtualHost) :类似于MySQL中的database,是一个逻辑上的集合,一个Broker上可以存在多个虚拟机
2. 交换机(Exchange) : 生产者先把消息发送到Broker的Exchange上,再由Exchange根据不同的规则转发给不同的Queue
3. 队列(Queue) : 真正存储消息的实体,每个消费者决定自己从哪个Queue上读取消息
4. 绑定(Binding) : Exchange 和 Queue之间的关联关系.Exchange 和 Queue之间可以理解成"多对多"的关系,每对关系可以用一个Binding来表示.
5. 消息(Message) ; 传递的内容
刚才我们提到,Exchange和Queue之间是"多对多"的关系, 其实是由交换机的类型决定的.
以RabbitMQ为例,交换机主要支持4种类型:
在我们实现的消息队列项目中,Broker是至关重要的,它要实现下列几个核心API,供生产者/消费者调用.
1. 创建队列(queueDeclare),这里的"创建"指的是没有则创建,有就啥也不干
2. 删除队列(queueDelete)
3. 创建交换机(exchangeDeclare),同上queueDeclare
4. 删除交换机(exchangeDelete)
5. 创建绑定(queueBind)
6. 解除绑定(queueUnbind)
7. 发布消息(basicPublish)
8. 订阅消息(basicConsume)
9. 确认消息(basicAck),消费者取走消息并不代表消费者处理了消息,因此需要让消费者告诉Broker,保证处理消息没有遗漏,在收到确认之前,Broker需要保存未确认的消息一段时间.
有的童靴可能会问,我们刚才提到的概念中,有"消费"这一概念,为啥没有提供相应的API呢?
很简单,因为RabbitMQ没有支持...
如果没有提供消费的API,Broker和Consumer的工作模式有两种:
1. Push,Broker主动把message发送给消费者,这种模式下MQ削峰填谷的作用不大(RabbitMQ支持)
2. Pull,消费者主动获取数据(这种模式下,message的时效性不高)
由于我们是以RabbitMQ为蓝图的,因此我们使用的也是第一种模式.
和阻塞队列不同,MessageQueue是为不同主机服务的.生产者和消费者都是客户端程序,Broker作为服务器,三者之间通过网络进行通信.
在网络通信的过程中,客户端要提供相应的API,来实现对服务器的操作.
也就是说,客户端的API只负责向服务器发送请求并接收服务器的响应,真正的业务由服务器端的API来做.这种远程调用的方式称为"RPC".
因此,客户端的API除了包含服务器提供的API外,还要有网络通信相关的API
1. 创建Connection,一个Connection对象代表一个TCP连接
2. 关闭Connection
3. 创建Channel,建立/销毁一个连接的成本比较高,Channel只是一个逻辑上的概念,一个Connection可以有多个Channel.
4. 关闭Channel
关于Connection和Channel,可以用下面这个栗子解释...
如果你到医院打针,要输三瓶药水,护士肯定只在你的胳膊上扎一次针,这就相当于建立了一个Connection,然后每换一瓶药水,不会重新扎针,而是用原来的针头,这就是一个Channel的创建.
5. 创建队列
6. 关闭队列
7. 创建交换机
8. 删除交换机
9. 创建绑定
10. 解除绑定
11. 发布消息
12. 订阅消息
13. 确认消息
应答模式分为两种.
1. 自动应答: 消费者消费了消息,就默认应答完毕,Broker直接将该消息删除.
2. 手动应答: 消费者手动调用应答接口,Broker收到应答请求之后,才会将该消息彻底删除.
对于一些不太重要的消息,可以采用第一种方式.RabbitMQ支持了两种应答方式,因此我们的项目也会实现这两种模式.
上面的一堆概念,想必童靴们听懂了但是没办法串联到一起,没关系,可以借助下面一张图来理解.
上面的图示中有几个问题需要解释:?
想必童靴们看完上面的概念之后还是很懵逼,下面我们把上面的模块拆开来实现.
我们先创建一个MetaDataManager类来封装对数据库的操作.在前面提到过,数据库是用来管理交换机,队列和绑定的,它们统称为"meta data".下面是MetaDataManager类提供的接口.
对于Exchange,Queue,Binding,我们使用SQLite数据库持久化保存.
相比于MySQL,SQLite更轻量,只需要一个.exe文件即可运行.因为我们创建的是SpringBoot项目,只需要引入相关依赖,这个项目就会自动加载jar包和dll动态库文件,就能直接运行SQLite数据库了.
另外,因为我们要使用MyBatis来操作数据库,所以在pom.xml中也要引入mybatis依赖
下面是application.yml的相关配置.
spring:
datasource:
#表示将数据库建在data目录下的meta.db文件中,基准目录是该项目所在的目录
url: jdbc:sqlite:./data/meta.db
# SQLite不需要用户名和密码,因为SQLite不是一个C/S结构的数据库,它只在本地运行,所以不需要验证
username:
password:
# 数据库的驱动类
driver-class-name: org.sqlite.JDBC
#mybatis的起手配置
mybatis:
mapper-locations: classpath:mapper/*Mapper.xml
configuration:
# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #此处不打印对数据库操作的日志了,测试的时候可以打印一下
map-underscore-to-camel-case: true
先来创建Queue,Exchange,Binding类.
@Data
public class QueueEntity {
//队列的唯一标识
private String name;
//是否可持久化
private boolean durable=false;
//如果没有消费者订阅这个队列,是否自动删除
private boolean autoDelete=false;
//是否为某个消费者私有
private boolean exclusive=false;
//额外参数
private Map<String,Object> arguments=new HashMap<>();
}
@Data
public class Exchange {
//交换机的唯一标识
private String name;
//交换机类型,默认是直接交换机
private ExchangeType type=ExchangeType.DIRECT;
//是否持久化
private boolean durable=false;
//如果没有队列绑定这个交换机,是否自动删除该交换机(暂时不实现)
private boolean autoDelete=false;
//额外参数
private Map<String,Object> arguments=new HashMap<>();
}
/**
* 定义交换机类型,1表示直接交换机,2表示扇出交换机,3表示主题交换机
*/
public enum ExchangeType{
DIRECT(1),
FANOUT(2),
TOPIC(3);
private final int type;
ExchangeType(int type){
this.type=type;
}
public int getType() {
return type;
}
}
@Data
public class Binding {
private String exchangeName;
private String queueName;
//交换机根据bindingKey确定要将消息转发给哪些队列
private String bindingKey;
}
?然后我们定义MetaMapper类进行数据库的ACUD操作.
@Mapper
public interface MetaMapper {
//创建相关的表
//因为涉及到服务器的重启操作,每次都要重新进行建库建表比较麻烦,所以调用代码来执行
@Insert("create table if not exists exchange(" +
"name varchar(50) primary key," +
"type int," +
"durable boolean," +
"auto_delete boolean," +
"arguments varchar(1024)"+
")")
void createExchangeTable();
@Insert("create table if not exists queue(" +
"name varchar(50) primary key," +
"durable boolean,"+
"auto_delete boolean," +
"exclusive boolean," +
"arguments varchar(1024)" +
")")
void createQueueTable();
@Insert("create table if not exists binding(" +
"exchangeName varchar(50)," +
"queueName varchar(50)," +
"binding_key varchar(100)" +
")")
void createBindingTable();
@Insert("insert into exchange values" +
"(#{name},#{type},#{durable},#{autoDelete},#{arguments})")
void insertExchange(Exchange exchange);
@Delete("delete from exchange where name=#{name}")
void deleteExchange(String name);
@Insert("insert into queue values" +
"(#{name},#{durable},#{autoDelete},#{exclusive},#{arguments})")
void insertQueue(QueueEntity queue);
@Delete("delete from queue where name=#{name}")
void deleteQueue(String name);
@Insert("insert into binding values" +
"(#{exchangeName},#{queueName},#{bindingKey})")
void insertBinding(Binding binding);
@Delete("delete from binding where exchangeName=#{exchangeName} and queueName=#{queueName}")
void deleteBinding(Binding binding);
@Select("select * from exchange")
List<Exchange> selectAllExchanges();
@Select("select * from queue")
List<QueueEntity> selectAllQueues();
@Select("select * from binding")
List<Binding> selectAllBindings();
}
因为Queue和Mapper中的Arguments是一个Map对象,没办法直接插入到数据库中,只能先转换成字符串.所以需要多写两个这两个类的get和set方法.
有些同学可能会疑惑,我们使用了@Data注解,自己再创建一个get和set,不会冲突吗?
如果我们自己创建了get和set,Lombok就不会再帮助我们生成了
//在Exchange和Queue类中添加如下方法
@SneakyThrows
public void setArguments(String jsonString){
ObjectMapper mapper=new ObjectMapper();
this.arguments=mapper.readValue(jsonString, new TypeReference<Map<String, Object>>() {});
}
@SneakyThrows
public String getArguments(){
ObjectMapper mapper=new ObjectMapper();
return mapper.writeValueAsString(arguments);
}
/**
* 为了方便测试
*/
public void setArguments(String key,Object value){
arguments.put(key,value);
}
public Object getArguments(String key){
return arguments.get(key);
}
/**
* 上层调用时可能会传入一个真正的Map作为额外参数
*/
public void setArguments(Map<String,Object> arguments){
this.arguments=arguments;
}
接下来就可以创建MetaManager类来进行对MetaMapper的封装了.
@Data
@Slf4j
public class MetaManager {
private MetaMapper metaMapper;
public void insertExchange(Exchange exchange){
metaMapper.insertExchange(exchange);
}
public void deleteExchange(String exchangeName){
metaMapper.deleteExchange(exchangeName);
}
public void insertQueue(QueueEntity queue){
metaMapper.insertQueue(queue);
}
public void deleteQueue(String queueName){
metaMapper.deleteQueue(queueName);
}
public void insertBinding(Binding binding){
metaMapper.insertBinding(binding);
}
public void deleteBinding(Binding binding){
metaMapper.deleteBinding(binding);
}
public List<Exchange> selectAllExchanges(){
return metaMapper.selectAllExchanges();
}
public List<QueueEntity> selectAllQueues(){
return metaMapper.selectAllQueues();
}
public List<Binding> selectAllBindings(){
return metaMapper.selectAllBindings();
}
/**
* 进行建库建表操作
* 1.建库,MyBatis进行数据库的操作的时候会自动建库
* 2.建表,需要手动完成
*/
public void init(){
//因为我们不需要将MetaManager交给Spring管理,但是如果想获取到MetaMapper对象就必须从Spring中取,所以采用下面的方式
this.metaMapper= DemoApplication.context.getBean(MetaMapper.class);
//如果数据库已经存在,就不需要再次建立
if(checkDBExists()){
log.info("数据库已经存在");
}else{
//数据库不存在,执行建表操作
createTables();
//进行数据的初始化
createDefaultData();
log.info("数据库初始化完成");
}
}
/**
*创建一个默认交换机,name="",type=DIRECT,durable=true
*/
private void createDefaultData() {
Exchange exchange=new Exchange();
exchange.setName("");
exchange.setDurable(true);
exchange.setType(ExchangeType.DIRECT);
exchange.setAutoDelete(false);
metaMapper.insertExchange(exchange);
}
private void createTables() {
metaMapper.createQueueTable();
metaMapper.createExchangeTable();
metaMapper.createBindingTable();
}
/**
* 判断数据库是否存在,就看meta.db是否存在即可
* @return
*/
private boolean checkDBExists() {
File file=new File("./data/meta.db");
return file.exists();
}
/**
* 每次测试数据之后进行删库操作,防止数据污染
*/
public void deleteDb(){
File file=new File("./data/meta.db");
file.delete();
}
}
因为MetaManager在初始化的时候需要用到DemoApplication类,所以对这个类的代码稍加修改.
@SpringBootApplication
public class DemoApplication {
public static ConfigurableApplicationContext context;
public static void main(String[] args) {
context=SpringApplication.run(DemoApplication.class, args);
}
}
?下面是这些类的目录结构,可做参考.
?单元测试是十分必要的,毕竟每个程序猿都不想自己一顿猛如虎的操作之后发现bug都有几千行!下面直接给出单元测试类的代码(仅供参考)
@SpringBootTest
class MetaManagerTest {
private MetaManager metaManager=new MetaManager();
@BeforeEach
void setUp() {
//因为metaManager初始化的时候需要用到DemoApplication.context,这里需要手动赋值
DemoApplication.context= SpringApplication.run(DemoApplication.class);
metaManager.init();
}
@Test
void init() {
//初始化完成后应该有一个默认交换机
List<Exchange> exchanges=metaManager.selectAllExchanges();
Assertions.assertEquals(1,exchanges.size());
Exchange exchange=exchanges.get(0);
Assertions.assertEquals("",exchange.getName());
Assertions.assertEquals(ExchangeType.DIRECT,exchange.getType());
Assertions.assertEquals(true,exchange.isDurable());
Assertions.assertEquals(false,exchange.isAutoDelete());
}
@AfterEach
void tearDown() {
//因为MyBatis正在使用数据库,所以必须把Spring项目关掉之后才可以删除数据库文件
DemoApplication.context.close();
metaManager.deleteDb();
}
@Test
void insertExchange() {
Exchange expectedExchange=new Exchange();
expectedExchange.setName("testExchange");
expectedExchange.setType(ExchangeType.TOPIC);
expectedExchange.setDurable(true);
expectedExchange.setAutoDelete(false);
expectedExchange.setArguments("aaa",1);
metaManager.insertExchange(expectedExchange);
List<Exchange> exchanges=metaManager.selectAllExchanges();
Assertions.assertEquals(2,exchanges.size());
Exchange actualExchange=exchanges.get(1);
Assertions.assertEquals("testExchange",actualExchange.getName());
Assertions.assertTrue(actualExchange.isDurable());
Assertions.assertFalse(actualExchange.isAutoDelete());
Assertions.assertEquals(ExchangeType.TOPIC,actualExchange.getType());
Assertions.assertEquals(1,actualExchange.getArguments("aaa"));
}
@Test
void insertQueue() {
QueueEntity expectedQueue=new QueueEntity();
expectedQueue.setName("testQueue");
expectedQueue.setDurable(false);
expectedQueue.setExclusive(false);
expectedQueue.setAutoDelete(false);
expectedQueue.setArguments("bbb",2);
metaManager.insertQueue(expectedQueue);
List<QueueEntity> queues=metaManager.selectAllQueues();
Assertions.assertEquals(1,queues.size());
QueueEntity actualQueue=queues.get(0);
Assertions.assertEquals("testQueue",actualQueue.getName());
Assertions.assertFalse(actualQueue.isAutoDelete());
Assertions.assertFalse(actualQueue.isDurable());
Assertions.assertFalse(actualQueue.isExclusive());
Assertions.assertEquals(2,actualQueue.getArguments("bbb"));
}
@Test
void insertBinding() {
Binding expectedBinding=new Binding();
expectedBinding.setExchangeName("testExchange");
expectedBinding.setQueueName("testQueue");
expectedBinding.setBindingKey("bindingKey");
metaManager.insertBinding(expectedBinding);
List<Binding> bindings=metaManager.selectAllBindings();
Assertions.assertEquals(1,bindings.size());
Binding actualBinding=bindings.get(0);
Assertions.assertEquals("testQueue",actualBinding.getQueueName());
Assertions.assertEquals("testExchange",actualBinding.getExchangeName());
Assertions.assertEquals("bindingKey",actualBinding.getBindingKey());
}
@Test
void deleteExchange() {
Exchange expectedExchange=new Exchange();
expectedExchange.setName("testExchange");
expectedExchange.setType(ExchangeType.TOPIC);
expectedExchange.setDurable(true);
expectedExchange.setAutoDelete(false);
expectedExchange.setArguments("aaa",1);
metaManager.insertExchange(expectedExchange);
List<Exchange> exchanges=metaManager.selectAllExchanges();
Assertions.assertEquals(2,exchanges.size());
metaManager.deleteExchange("testExchange");
exchanges=metaManager.selectAllExchanges();
Assertions.assertEquals(1,exchanges.size());
}
@Test
void deleteQueue() {
QueueEntity expectedQueue=new QueueEntity();
expectedQueue.setName("testQueue");
expectedQueue.setDurable(false);
expectedQueue.setExclusive(false);
expectedQueue.setAutoDelete(false);
expectedQueue.setArguments("bbb",2);
metaManager.insertQueue(expectedQueue);
List<QueueEntity> queues=metaManager.selectAllQueues();
Assertions.assertEquals(1,queues.size());
metaManager.deleteQueue("testQueue");
queues=metaManager.selectAllQueues();
Assertions.assertEquals(0,queues.size());
}
@Test
void deleteBinding() {
Binding expectedBinding=new Binding();
expectedBinding.setExchangeName("testExchange");
expectedBinding.setQueueName("testQueue");
expectedBinding.setBindingKey("bindingKey");
metaManager.insertBinding(expectedBinding);
List<Binding> bindings=metaManager.selectAllBindings();
Assertions.assertEquals(1,bindings.size());
Binding binding=new Binding();
binding.setExchangeName("testExchange");
binding.setQueueName("testQueue");
metaManager.deleteBinding(binding);
bindings=metaManager.selectAllBindings();
Assertions.assertEquals(0,bindings.size());
}
}
前面我们提到过,Message的持久化需要放进文件里,因为message的数量比较多,如果使用数据库来存储,性能会变慢; 并且message不需要复杂的增删改查,不需要用到数据库
既然要操作的是Message,我们肯定要定义一个Message类了.
@Data
public class Message implements Serializable {
//message的基本属性
private BasicProperties basicProperties;
//message存放的消息内容,因为涉及到网络传输和文件存储,使用字节数组存储比较合适
private byte[] body;
//辅助成员
//表示在文件中的起始/终止位置,方便把该消息读取/存放到文件里
//这两个属性没有必要存放在文件里,使用transient关键字,就不会被序列化
private transient long offsetBegin;
private transient long offsetEbd;
//表示该消息是否有效,1表示有效,0表示无效
private byte isValid=0x1;
//因为消息的数目比较多,所以使用工厂方法,messageId由系统自动生成,以"M-"作为前缀
public static Message createMessageWithId(String routingKey,BasicProperties properties,byte[] body){
Message message=new Message();
//一般properties传的参数都是null
if(properties!=null){
message.setBasicProperties(properties);
}
message.setMessageId(generateMessageId());
message.setRoutingKey(routingKey);
message.setBody(body);
return message;
}
private static String generateMessageId() {
return "M-"+ UUID.randomUUID();
}
/**
* 提供操作基本属性的接口,方便外界调用
* @param messageId
*/
public void setMessageId(String messageId){
basicProperties.setMessageId(messageId);
}
public String getMessageId(){
return basicProperties.getMessageId();
}
public void setRoutingKey(String routingKey){
basicProperties.setRoutingKey(routingKey);
}
public String getRoutingKey(){
return basicProperties.getRoutingKey();
}
public boolean isDelivery() {
return (basicProperties.getDelivery() == 0x2);
}
public void setDelivery(byte delivery){
basicProperties.setDelivery(delivery);
}
}
@Data
public class BasicProperties implements Serializable {
//消息的唯一标识
private String messageId;
//交换机根据routingKey转发该消息
private String routingKey;
//是否持久化 1表示非持久化 2表示持久化
private byte delivery=0x1;
}
?因为message是存放在Queue上的,所以我们需要为每个Queue创建一个目录目录名即为队列名,这个目录下有两个文件:
- queue_data.txt,用来存放这个队列的全部消息,以二进制的方式存储
- queue_stat.txt,用来统计这个队列的统计信息,方便进行GC操作
我们再来创建一个MessageFileManager类,用来操作message相关的文件.
?下面定义一下这个类要提供的接口
以队列为维度创建文件,每个队列都有一个目录,目录的名字为队列名,这个目录存放在data目录下,形如./data/testQeue,每个目录包含两个文件:
下面我们来规定一下这两种文件的格式.
现在看stat文件吧,它比较简单.
再来定义data文件
规定,当一个队列中的总消息数超过1500,且有效消息数低于50%时,就要进行GC操作,防止因为文件过大导致性能降低.
下面让我们一起来实现一下这个类,配合注释看更下饭哦~
@Data
@Slf4j
public class MessageFileManager {
//定义统计文件对应的类
public static class Stat{
public int totalCount;
public int validCount;
}
/**
* MessageFileManager的初始化方法,实际上啥也没干,为了方便代码扩展
*/
public void init(){}
/**
* @param queueName
* @return 返回队列对应的目录名
*/
private String getQueueDirPath(String queueName){
return "./data/"+queueName;
}
/**
* @param queueName
* @return 返回队列对应的data文件名
*/
private String getQueueDataPath(String queueName){
return getQueueDirPath(queueName)+"/queue_data.txt";
}
/**
* @param queueName
* @return 返回队列对应的统计文件名
*/
private String getQueueStatPath(String queueName){
return getQueueDirPath(queueName)+"/queue_stat.txt";
}
/**
* 创建队列对应的文件,因为如果有则不创建,所以不用加锁
* @param queueName
*/
public void createQueueFiles(String queueName) throws IOException {
File dir=new File(getQueueDirPath(queueName));
if(!dir.exists()){
boolean ok=dir.mkdirs();//有该目录则不创建,没有就创建
if(!ok){
throw new IOException("[MessageFileManager] 创建队列目录失败,queueName="+queueName);
}
}
File dataFile=new File(getQueueDataPath(queueName));
if(!dataFile.exists()){
boolean ok=dataFile.createNewFile();
if(!ok){
throw new IOException("[MessageFileManager] 创建队列的data文件失败,queue_data.txt="+getQueueDataPath(queueName));
}
}
File statFile=new File(getQueueStatPath(queueName));
if(!statFile.exists()){
boolean ok=statFile.createNewFile();
if(!ok){
throw new IOException("[MessageFileManager] 创建队列的stat文件失败,queue_stat.txt="+getQueueStatPath(queueName));
}
}
//给统计文件写入初始数据,后续操作stat文件时不用对空文件进行判定
Stat stat=new Stat();
stat.totalCount=0;
stat.validCount=0;
writeStat(queueName,stat);
}
/**
* 删除队列对应的文件,因为删除失败也没啥影响,所以就不加锁了
*/
public void deleteQueueFiles(String queueName) throws IOException {
File dataFile=new File(getQueueDataPath(queueName));
boolean ok1= dataFile.delete();
File statFile=new File(getQueueStatPath(queueName));
boolean ok2=statFile.delete();
File dir= new File(getQueueDirPath(queueName));
boolean ok3= dir.delete();
if(!ok1||!ok2||!ok3){
throw new IOException("[MessageFileManager] 删除队列文件失败,queueName="+queueName);
}
}
/**
* 对统计文件进行写操作
* 本来对文件进行读写操作是要加锁的,但是外层调用者加了,这里就不加了
* @param queueName
* @param stat
*/
private void writeStat(String queueName,Stat stat) {
try(OutputStream outputStream=new FileOutputStream(getQueueStatPath(queueName));
PrintWriter writer=new PrintWriter(outputStream)){
writer.write(stat.totalCount+"\t"+ stat.validCount);
writer.flush();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 对统计文件进行读操作
* @param queueName
* @return
*/
private Stat readStat(String queueName){
Stat stat=null;
try(InputStream inputStream=new FileInputStream(getQueueStatPath(queueName));
Scanner scanner=new Scanner(inputStream)) {
stat=new Stat();
stat.totalCount=scanner.nextInt();
stat.validCount=scanner.nextInt();
return stat;
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return stat;
}
/**
* 发送消息
* 1. 得到消息的offsetBegin和offsetEnd
* 2. 将该消息写到data文件上
* 3. 更新stat文件
*/
public void sendMessage(QueueEntity queue, Message message) throws MqException, IOException {
//涉及到多线程,要进行加锁
synchronized (queue){
//先检查这个队列对应的文件是否存在
if(!isExists(queue.getName())){
throw new MqException("[MessageFileManager] sendMessage,要发送的队列文件不存在");
}
File dataFile=new File(getQueueDataPath(queue.getName()));
//先更新message的offsetBegin和offsetEnd
long fileLength=dataFile.length();
byte[] messageBinary= BinaryTool.toBytes(message);
message.setOffsetBegin(fileLength+4);
message.setOffsetEnd(fileLength+messageBinary.length+4);
//再进行文件写入
//outPutStream打开文件时会默认清空原有内容,需要将append参数设置为true
try(OutputStream outputStream=new FileOutputStream(dataFile,true);
DataOutputStream dataOutputStream=new DataOutputStream(outputStream)){
//先写入长度,使用DataOutputStream类,固定写4byte
dataOutputStream.writeInt(messageBinary.length);
//再写二进制的message
dataOutputStream.write(messageBinary);
}
//更新stat文件
Stat stat=readStat(queue.getName());
stat.totalCount++;
stat.validCount++;
writeStat(queue.getName(),stat);
}
log.info("硬盘已成功添加message,queueName="+queue.getName()+",messageId="+message.getMessageId());
}
/**
* 逻辑删除message: 1.将isValid字段置为0x0 2.把这个消息写回去 3.更新统计文件
* @param queue
* @param message 这个message是内存传过来的
*/
public void deleteMessage(QueueEntity queue,Message message) throws MqException, IOException, ClassNotFoundException {
synchronized (queue){
if(!isExists(queue.getName())){
throw new MqException("[MessageFileManager] deleteMessage,消息所在的队列文件不存在");
}
//根据消息的offsetBegin和offsetEnd字段找到这条消息
File dataFile=new File(getQueueDataPath(queue.getName()));
//采用随机读取的方法,读取方式为写和读
try(RandomAccessFile randomAccessFile=new RandomAccessFile(dataFile,"rw")){
//先找到该消息的起始位置
randomAccessFile.seek(message.getOffsetBegin());
//将这个对象的isValid设置为0x0
message.setIsValid((byte) 0x00);
//将message转换成字节数组
byte[] bodyDest=BinaryTool.toBytes(message);
//将这个数组写入文件
randomAccessFile.write(bodyDest);
}
Stat stat=readStat(queue.getName());
stat.validCount--;
writeStat(queue.getName(),stat);
}
log.info("已删除硬盘上的message,queueName="+queue.getName()+",messageId="+message.getMessageId());
}
/**
* 判断某个队列对应的文件是否存在
* @param queueName
* @return
*/
private boolean isExists(String queueName) {
File dir=new File(getQueueDirPath(queueName));
if(!dir.isDirectory()){
return false;
}
File dataFile=new File(getQueueDataPath(queueName));
if(!dataFile.isFile()){
return false;
}
File statFile=new File(getQueueStatPath(queueName));
if(!statFile.isFile()){
return false;
}
return true;
}
/**
* 从硬盘上加载queue存放的全部Message,服务器重启(只有一个线程)/GC操作(GC方法已经加过锁了)会调用该方法,所以可以不用加锁
* @param queueName
* @return 返回LinkedList,方便进行头删
*/
public LinkedList<Message> loadMessages(String queueName) throws MqException, IOException, ClassNotFoundException {
//synchronized (queue){
if(!isExists(queueName)) {
throw new MqException("[MessageFileManager] loadQueueMessages,要加载的队列文件不存在!");
}
LinkedList<Message> messages=new LinkedList<>();
//记录当前message的位置
long currentSet=0;
try(InputStream inputStream=new FileInputStream(getQueueDataPath(queueName));
DataInputStream dataInputStream=new DataInputStream(inputStream)){
while(true){
//先读取消息的长度
int expectedLength=dataInputStream.readInt();
byte[] body=new byte[expectedLength];
int actualLength=dataInputStream.read(body);
if(actualLength!=expectedLength){
throw new MqException("[MessageFileManager] 消息存储的格式不正确!");
}
//转换成消息对象
Message message=(Message) BinaryTool.fromBytes(body);
if(message.getIsValid()==0x0){
//这个消息是无效的
currentSet+=4+body.length;
continue;
}
//设置辅助属性
currentSet+=4;
message.setOffsetBegin(currentSet);
currentSet+=actualLength;
message.setOffsetEnd(currentSet);
messages.add(message);
}
}catch (EOFException e){
//dataInputStream读到文件结尾时,会抛出EOFException异常
log.info("成功加载该队列的全部消息,queueName="+queueName);
}
return messages;
//}
}
/**
* 判断是否要进行GC操作,由上层调用,所以修饰符为public
* total>1500&&valid<750时触发GC
*/
public boolean checkGC(String queueName){
//根据统计文件判断
Stat stat=readStat(queueName);
if(stat.totalCount>1500&&stat.validCount*1.0 / stat.totalCount<0.5){
return true;
}else {
return false;
}
}
/**
* 进行GC操作
* 1. 创建一个新的data文件,queue_data_new.txt
* 2. 读取旧文件的全部有效message
* 3. 将message加载到新文件上
* 4. 将新文件重命名
* 5. 修改统计文件
*/
public void GC(QueueEntity queue) throws MqException, IOException, ClassNotFoundException {
synchronized (queue){
//统计一下GC耗时
long start=System.currentTimeMillis();
File newDataFile=new File(getQueueNewDataPath(queue.getName()));
if(newDataFile.exists()){
//可能上一次GC的时候没删掉,抛异常
throw new MqException("[MessageFileManager] GC时新文件已存在,queue_data_new.txt="+getQueueNewDataPath(queue.getName()));
}
boolean ok=newDataFile.createNewFile();
if(!ok){
throw new IOException("[MessageFileManager] 无法创建新的data文件,queue_data_new,txt="+getQueueNewDataPath(queue.getName()));
}
//从旧文件中加载全部有效message
List<Message> messages=loadMessages(queue.getName());
//将加载的message写到新文件中
try(OutputStream outputStream=new FileOutputStream(getQueueNewDataPath(queue.getName()));
DataOutputStream dataOutputStream=new DataOutputStream(outputStream)){
for(Message message:messages){
byte[] body=BinaryTool.toBytes(message);
dataOutputStream.writeInt(body.length);
dataOutputStream.write(body);
}
}
//让新文件取代旧文件
File oldDataFile=new File(getQueueDataPath(queue.getName()));
ok=oldDataFile.delete();
if(!ok){
throw new IOException("[MessageFileManager] 无法删除旧文件,queue_data.txt="+getQueueDataPath(queue.getName()));
}
ok=newDataFile.renameTo(oldDataFile);
if(!ok){
throw new IOException("[MessageFileManager] 新文件重命名失败,queueName="+queue.getName());
}
//更新统计文件
Stat stat=new Stat();
stat.totalCount= messages.size();
stat.validCount= messages.size();
readStat(queue.getName());
long end=System.currentTimeMillis();
log.info("GC耗时: "+(end-start)+"ms");
}
}
private String getQueueNewDataPath(String queueName) {
return getQueueDirPath(queueName)+"/queue_data_new.txt";
}
}
下面是单元测试的代码.(仅供参考,童靴们可以写更多的测试案例)
@SpringBootTest
class MessageFileManagerTest {
private MessageFileManager fileManager=new MessageFileManager();
private final String queueName1="testQueue1";
private final String queueName2="testQueue2";
@BeforeEach
void setUp() throws IOException {
//创建队列文件,队列名为testQueue
fileManager.init();
fileManager.createQueueFiles(queueName1);
fileManager.createQueueFiles(queueName2);
}
@AfterEach
void tearDown() throws IOException {
//销毁队列对应的文件
fileManager.deleteQueueFiles(queueName1);
fileManager.deleteQueueFiles(queueName2);
}
@Test
void createQueueFiles() {
//检查两个队列的文件是否已经创建好
//利用反射调用isExists方法
Boolean ok1=ReflectionTestUtils.invokeMethod(fileManager,"isExists",queueName1);
Boolean ok2=ReflectionTestUtils.invokeMethod(fileManager,"isExists",queueName2);
Assertions.assertTrue(ok1);
Assertions.assertTrue(ok2);
}
private QueueEntity createQueue(String queueName){
QueueEntity queue=new QueueEntity();
queue.setName(queueName);
return queue;
}
private Message createMessage(String routingKey,String content){
Message message=Message.createMessageWithId(routingKey,null,content.getBytes());
return message;
}
@Test
void sendMessage() throws IOException, MqException, ClassNotFoundException {
QueueEntity queue=createQueue(queueName1);
Message message=createMessage("testRoutingKey","hello");
fileManager.sendMessage(queue,message);
LinkedList<Message> messages=fileManager.loadMessages(queueName1);
Assertions.assertEquals(1,messages.size());
Message message1=messages.get(0);
Assertions.assertEquals("testRoutingKey",message1.getRoutingKey());
Assertions.assertEquals(message.getMessageId(),message1.getMessageId());
Assertions.assertArrayEquals(message.getBody(),message1.getBody());
}
@Test
void loadMessages() throws IOException, MqException, ClassNotFoundException {
QueueEntity queue=createQueue(queueName1);
List<Message> exceptedMessages=new ArrayList<>();
for(int i=0;i<10;i++){
Message message=createMessage("testRoutingKey","content"+i);
exceptedMessages.add(message);
fileManager.sendMessage(queue,message);
}
for(int i=0;i<10;i+=2){
fileManager.deleteMessage(queue,exceptedMessages.get(i));
}
List<Message> actualMessages=fileManager.loadMessages(queueName1);
for(int i=0;i<actualMessages.size();i++){
Message exceptedMsg=exceptedMessages.get(i*2+1);
Message actualMsg=actualMessages.get(i);
Assertions.assertEquals(exceptedMsg.getMessageId(),actualMsg.getMessageId());
Assertions.assertEquals(exceptedMsg.getRoutingKey(),actualMsg.getRoutingKey());
Assertions.assertArrayEquals(exceptedMsg.getBody(),actualMsg.getBody());
Assertions.assertEquals(exceptedMsg.getOffsetBegin(),actualMsg.getOffsetBegin());
Assertions.assertEquals(exceptedMsg.getOffsetEnd(),actualMsg.getOffsetEnd());
}
}
@Test
void GC() throws IOException, MqException, ClassNotFoundException {
//先插入几个message
QueueEntity queue=createQueue(queueName1);
List<Message> messages=new ArrayList<>();
for(int i=0;i<10;i++){
Message message=createMessage("testRoutingKey","content"+i);
fileManager.sendMessage(queue,message);
messages.add(message);
}
//删除后五个message
for(int i=5;i<10;i++){
fileManager.deleteMessage(queue,messages.get(i));
}
//修改stat文件,因为是私有方法,这里使用反射
MessageFileManager.Stat stat=new MessageFileManager.Stat();
stat.totalCount=2000;
stat.validCount=5;
ReflectionTestUtils.invokeMethod(fileManager,"writeStat",queueName1,stat);
assertTrue(fileManager.checkGC(queueName1));
fileManager.GC(queue);
List<Message> lastMessages=fileManager.loadMessages(queueName1);
assertEquals(5,lastMessages.size());
for(int i=0;i<5;i++){
Message actualMsg=lastMessages.get(i);
Message expectedMsg=messages.get(i);
assertEquals(expectedMsg.getMessageId(),actualMsg.getMessageId());
assertEquals(expectedMsg.getRoutingKey(),actualMsg.getRoutingKey());
assertArrayEquals(expectedMsg.getBody(),actualMsg.getBody());
assertEquals(expectedMsg.getOffsetBegin(),actualMsg.getOffsetBegin());
assertEquals(expectedMsg.getOffsetEnd(),actualMsg.getOffsetEnd());
}
}
}
? 为了便于虚拟主机调用硬盘操作上的方法,我们对硬盘上的操作进行封装,分为对数据库的操作和对文件的操作.
需要创建一个DiskManager类,下面是这个类的源码.因为MessageFileManager和MetaManager类我们之前已经检测过了,DiskManager类只是对这两个类封装了一层,所以就不进行单元测试了.
@Slf4j
public class DiskManager {
private MessageFileManager messageFileManager=new MessageFileManager();
private MetaManager metaManager=new MetaManager();
public void init(){
metaManager.init();
messageFileManager.init();
}
//封装对交换机的操作
public void insertExchange(Exchange exchange){
metaManager.insertExchange(exchange);
log.info("硬盘成功添加交换机,exchangeName="+exchange.getName());
}
public void deleteExchange(String exchangeName){
metaManager.deleteExchange(exchangeName);
log.info("硬盘成功删除交换机,exchangeName="+exchangeName);
}
public List<Exchange> selectAllExchanges(){
return metaManager.selectAllExchanges();
}
//对队列的封装
public void insertQueue(QueueEntity queue) throws IOException {
metaManager.insertQueue(queue);
log.info("硬盘成功添加队列,queueName"+queue.getName());
//同时创建这个队列的文件
messageFileManager.createQueueFiles(queue.getName());
}
public void deleteQueue(String queueName) throws IOException {
metaManager.deleteQueue(queueName);
log.info("硬盘成功删除队列,queueName="+queueName);
//同时删除这个队列的文件
messageFileManager.deleteQueueFiles(queueName);
}
public List<QueueEntity> selectAllQueues(){
return metaManager.selectAllQueues();
}
//对绑定的封装
public void insertBinding(Binding binding){
metaManager.insertBinding(binding);
log.info("硬盘成功添加绑定,binding="+binding);
}
public void deleteBinding(Binding binding){
metaManager.deleteBinding(binding);
log.info("硬盘成功删除绑定,binding="+binding);
}
public List<Binding> selectAllBindings(){
return metaManager.selectAllBindings();
}
//对message的封装
public void sendMessage(QueueEntity queue,Message message) throws IOException, MqException {
messageFileManager.sendMessage(queue,message);
}
public void deleteMessage(QueueEntity queue,Message message) throws IOException, MqException, ClassNotFoundException {
messageFileManager.deleteMessage(queue,message);
//检查是否要进行GC操作
if(messageFileManager.checkGC(queue.getName())){
messageFileManager.GC(queue);
}
}
public LinkedList<Message> loadAllMessages(String queueName) throws IOException, MqException, ClassNotFoundException {
return messageFileManager.loadMessages(queueName);
}
}
在概念篇中提到过,对于Broker Server来说,内存是主要的存储介质,硬盘只是辅助存储,用来进行数据恢复的.下面就创建一个MemoryManager类,用于管理内存中的数据.????????
?下图是MemoryManager提供的API.
先来给出MemoryManager的成员变量,根据它们更容易理解为啥要提供这些API
public class MemoryManager {
//考虑到线程安全,用到哈希表这个结构时使用ConcurrentMap
//交换机的哈希表,key=exchangeName
private ConcurrentHashMap<String, Exchange> exchangesMap=new ConcurrentHashMap<>();
//队列的哈希表,key=queueName
private ConcurrentHashMap<String, QueueEntity> queuesMap=new ConcurrentHashMap<>();
//绑定的hash表,因为交换机需要根据routingKey和bindingKey之间的对应关系将message投递到指定队列,所以第一个Key=exchangeName,第二个key=queueName
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap=new ConcurrentHashMap<>();
//message的hash表,因为消息是以队列为维度进行存储的,所以一个队列对应存储的一串消息,key=queueName
//使用LinkedList,方便进行头删
private ConcurrentHashMap<String, LinkedList<Message>> queueMessagesMap=new ConcurrentHashMap<>();
//每个队列的消息被取出后,要等待客户端确认才能彻底删除,所以需要有一个数据结构专门存储未确认的消息
//为了方便找到未确认的消息并进行删除,使用hash结构,第一个key是queueName,第二个key是messageId
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessagesWaitAckMap=new ConcurrentHashMap<>();
//消息中心,用来存放全部的消息,key=messageId
private ConcurrentHashMap<String,Message> messagesCenter=new ConcurrentHashMap<>();
}
下面一起来实现这个类吧.
?该说的话都放在注释里了~~
@Data
@Slf4j
public class MemoryManager {
//考虑到线程安全,用到哈希表这个结构时使用ConcurrentMap
//交换机的哈希表,key=exchangeName
private ConcurrentHashMap<String, Exchange> exchangesMap=new ConcurrentHashMap<>();
//队列的哈希表,key=queueName
private ConcurrentHashMap<String, QueueEntity> queuesMap=new ConcurrentHashMap<>();
//绑定的hash表,因为交换机需要根据routingKey和bindingKey之间的对应关系将message投递到指定队列,所以第一个Key=exchangeName,第二个key=queueName
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap=new ConcurrentHashMap<>();
//message的hash表,因为消息是以队列为维度进行存储的,所以一个队列对应存储的一串消息,key=queueName
//使用LinkedList,方便进行头删
private ConcurrentHashMap<String, LinkedList<Message>> queueMessagesMap=new ConcurrentHashMap<>();
//每个队列的消息被取出后,要等待客户端确认才能彻底删除,所以需要有一个数据结构专门存储未确认的消息
//为了方便找到未确认的消息并进行删除,使用hash结构,第一个key是queueName,第二个key是messageId
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queuesMessageWaitAckMap=new ConcurrentHashMap<>();
//消息中心,用来存放全部的消息,key=messageId
private ConcurrentHashMap<String,Message> messagesCenter=new ConcurrentHashMap<>();
//init方法啥也没干,为了方便扩展
public void init(){}
//queue的相关操作
public void insertQueue(QueueEntity queue){
queuesMap.put(queue.getName(),queue);
log.info("内存成功插入Queue,queueName="+queue.getName());
}
public void deleteQueue(String queueName){
queuesMap.remove(queueName);
log.info("内存成功删除Queue,queueName="+queueName);
}
public QueueEntity getQueue(String queueName){
return queuesMap.get(queueName);
}
//Exchange的相关操作
public void insertExchange(Exchange exchange){
exchangesMap.put(exchange.getName(),exchange);
log.info("内存成功添加Exchange,exchangeName="+exchange.getName());
}
public void deleteExchange(String exchangeName){
exchangesMap.remove(exchangeName);
log.info("内存成功删除Exchange,exchangeName="+exchangeName);
}
public Exchange getExchange(String exchangeName){
return exchangesMap.get(exchangeName);
}
//Binding的相关操作
public void insertBinding(Binding binding) throws MqException {
//先找到交换机的所有绑定的hash表
//如果没有交换机没有绑定,创建一个
ConcurrentHashMap<String,Binding> bindingMap=bindingsMap.computeIfAbsent(binding.getExchangeName(),c->new ConcurrentHashMap<>());
//查找这个绑定是否存在,如果存在,不能插入该绑定,因为如果是多线程插入绑定并且bindingKey不一样,就会覆盖
//因为既涉及查询又涉及写操作,要加锁变成操作
synchronized (bindingMap){
if(bindingMap.get(binding.getQueueName())!=null){
throw new MqException("[MemoryManager] 要插入的绑定已经存在!,binding="+binding);
}
//插入该绑定
bindingMap.put(binding.getQueueName(),binding);
log.info("内存插入绑定,binding="+binding);
}
}
//一般情况下,多线程操作,多删一次没啥副作用,所以删除不加锁
//但是如果删除失败,需要打印一下
public void deleteBinding(Binding binding){
//先查找交换机的所有绑定
ConcurrentHashMap<String,Binding> bindingMap=bindingsMap.get(binding.getExchangeName());
if(bindingMap==null){
log.info("内存删除绑定失败!该交换机没有绑定队列,exchangeName="+binding.getExchangeName());
return;
}
//找到这个绑定并删除
Binding toDelete=bindingMap.remove(binding.getQueueName());
if(toDelete==null){
log.info("内存删除绑定失败!没有找到该绑定,binding="+binding);
return;
}
log.info("内存成功删除绑定,binding="+binding);
}
public Binding getBinding(String queueName,String exchangeName){
ConcurrentHashMap<String,Binding> bindingMap=bindingsMap.get(exchangeName);
if(bindingMap==null){
return null;
}
return bindingMap.get(queueName);
}
//message的操作
public void insertMessage(Message message){
messagesCenter.put(message.getMessageId(),message);
}
public void deleteMessage(String messageId){
messagesCenter.remove(messageId);
}
public Message getMessage(String messageId){
return messagesCenter.get(messageId);
}
//消息和队列
public void sendMessageToQueue(String queueName,Message message){
//找到给队列存储消息的hash表,不存在则创建
LinkedList<Message> queueMessages=queueMessagesMap.computeIfAbsent(queueName,c->new LinkedList<>());
//给这个消息表中插入一条消息
synchronized (queueMessages){
queueMessages.add(message);
}
//消息中心也要保存一份
messagesCenter.put(message.getMessageId(),message);
log.info("成功向队列发送消息并保存到消息中心,queueName="+queueName+",messageId="+message.getMessageId());
}
/**
* 队列向消费者发送消息,同时从队列中删除这条消息,并加入到自己的未确认消息队列中
*/
public Message getMessageFromQueue(String queueName){
LinkedList<Message> queueMessages=queueMessagesMap.get(queueName);
if(queueMessages==null){
return null;
}
Message message=null;
synchronized (queueMessages){
if(queueMessages.size()==0){
return null;
}
message=queueMessages.pop();
log.info("已从队列中取出第一条消息,queueName="+queueName+",messageId="+message.getMessageId());
return message;
}
}
/**
* 添加一条未确认消息
*/
public void insertMessageWaitAck(String queueName,Message message){
ConcurrentHashMap<String,Message> queueMessageWaitAckMap=queuesMessageWaitAckMap.computeIfAbsent(queueName,c->new ConcurrentHashMap<>());
queueMessageWaitAckMap.put(message.getMessageId(),message);
log.info("消息进入待确认队列,queueName="+queueName+",messageId="+message.getMessageId());
}
/**
* 从待确认队列中删除一条消息
*/
public void deleteMessageWaitAck(String queueName,String messageId) {
ConcurrentHashMap<String, Message> queueMessageWaitAckMap = queuesMessageWaitAckMap.get(queueName);
if (queueMessageWaitAckMap == null) {
return;
}
queueMessageWaitAckMap.remove(messageId);
log.info("将消息从待确认队列中移除,queueName="+queueName+",messageId="+messageId);
}
/**
* 查询未确认消息
*/
public Message getMessageWaitAck(String queueName, String messageId){
ConcurrentHashMap<String,Message> queueMessageWaitAckMap=queuesMessageWaitAckMap.get(queueName);
if(queueMessageWaitAckMap==null){
return null;
}
return queueMessageWaitAckMap.get(messageId);
}
/**
* 从硬盘上恢复数据
*/
public void loadDiskData(DiskManager manager) throws IOException, MqException, ClassNotFoundException {
//恢复meta数据
List<Exchange> exchanges=manager.selectAllExchanges();
for(Exchange exchange:exchanges){
exchangesMap.put(exchange.getName(),exchange);
}
List<QueueEntity> queueEntities=manager.selectAllQueues();
for(QueueEntity queue:queueEntities){
queuesMap.put(queue.getName(),queue);
}
List<Binding> bindings=manager.selectAllBindings();
for(Binding binding:bindings){
ConcurrentHashMap<String,Binding> bindingMap=bindingsMap.computeIfAbsent(binding.getExchangeName(),c->new ConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(),binding);
}
//恢复message
//不需要恢复待确认消息,因为持久化的消息会被重新加载到内存中,待确认消息也在里面,可以让消费者重新取一遍
for(QueueEntity queue:queueEntities){
LinkedList<Message> queueMessages=manager.loadAllMessages(queue.getName());
for(Message message:queueMessages){
sendMessageToQueue(queue.getName(),message);
}
}
log.info("从硬盘上恢复数据成功!");
}
/**
* 查看某个队列的消息数
*/
public int getQueueSize(QueueEntity queue){
synchronized (queue){
LinkedList<Message> messages=queueMessagesMap.get(queue.getName());
if(messages==null){
return 0;
}
return messages.size();
}
}
}
@SpringBootTest
class MemoryManagerTest {
private MemoryManager memoryManager=null;
@BeforeEach
void setUp() {
memoryManager=new MemoryManager();
memoryManager.init();
}
@AfterEach
void tearDown() {
memoryManager=null;
}
private QueueEntity createQueue(String queueName){
QueueEntity queue=new QueueEntity();
queue.setName(queueName);
return queue;
}
private Exchange createExchange(String exchangeName){
Exchange exchange=new Exchange();
exchange.setName(exchangeName);
return exchange;
}
private Binding createBinding(String queueName,String exchangeName){
Binding binding=new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey("bidingKey");
return binding;
}
@Test
void testQueue() {
QueueEntity expectedQueue =createQueue("testQueue");
memoryManager.insertQueue(expectedQueue);
//因为内存中存放的是引用,直接比较两个引用是否是同一个即可
QueueEntity actualQueue=memoryManager.getQueue("testQueue");
Assertions.assertTrue(expectedQueue==actualQueue);
memoryManager.deleteQueue("testQueue");
actualQueue=memoryManager.getQueue("testQueue");
Assertions.assertNull(actualQueue);
}
@Test
void testExchange() {
Exchange expectedExchange=createExchange("testExchange");
memoryManager.insertExchange(expectedExchange);
Exchange actualExchange=memoryManager.getExchange("testExchange");
Assertions.assertTrue(expectedExchange==actualExchange);
memoryManager.deleteExchange("testExchange");
actualExchange=memoryManager.getExchange("testExchange");
Assertions.assertNull(actualExchange);
}
@Test
void insertBinding() throws MqException {
Binding expectedBinding=createBinding("testQueue","testExchange");
memoryManager.insertBinding(expectedBinding);
Binding actualBinding=memoryManager.getBinding("testQueue","testExchange");
Assertions.assertTrue(expectedBinding==actualBinding);
memoryManager.deleteBinding(expectedBinding);
actualBinding=memoryManager.getBinding("testQueue","testExchange");
Assertions.assertNull(actualBinding);
}
private Message createMessage(String content){
Message message=Message.createMessageWithId("routingKey",null,content.getBytes());
return message;
}
@Test
void testMessage() {
Message expectedMessage=createMessage("hello");
memoryManager.insertMessage(expectedMessage);
Message actualMessage=memoryManager.getMessage(expectedMessage.getMessageId());
Assertions.assertTrue(expectedMessage==actualMessage);
memoryManager.deleteMessage(expectedMessage.getMessageId());
actualMessage=memoryManager.getMessage(expectedMessage.getMessageId());
Assertions.assertNull(actualMessage);
}
@Test
void messageToQueue() {
Message expectedMessage=createMessage("hello");
QueueEntity queue=createQueue("testQueue");
memoryManager.sendMessageToQueue(queue.getName(),expectedMessage);
Message actualMessage=memoryManager.getMessageFromQueue("testQueue");
Assertions.assertEquals(expectedMessage,actualMessage);
int size=memoryManager.getQueueSize(queue);
Assertions.assertEquals(0,size);
}
@Test
void messageWaitAck() {
Message expectedMessage=createMessage("hello");
memoryManager.insertMessageWaitAck("testQueue",expectedMessage);
Message actualMessage=memoryManager.getMessageWaitAck("testQueue",expectedMessage.getMessageId());
Assertions.assertTrue(expectedMessage==actualMessage);
memoryManager.deleteMessageWaitAck("testQueue", expectedMessage.getMessageId());
actualMessage=memoryManager.getMessageWaitAck("testQueue", expectedMessage.getMessageId());
Assertions.assertNull(actualMessage);
}
@Test
void loadDiskData() throws IOException, MqException, ClassNotFoundException {
//先给硬盘添加数据
DemoApplication.context= SpringApplication.run(DemoApplication.class);
DiskManager diskManager=new DiskManager();
diskManager.init();
Binding binding=createBinding("testQueue","testExchange");
QueueEntity queue=createQueue("testQueue");
Exchange exchange=createExchange("testExchange");
diskManager.insertBinding(binding);
diskManager.insertExchange(exchange);
diskManager.insertQueue(queue);
LinkedList<Message> messages=new LinkedList<>();
for(int i=0;i<10;i++){
Message message=createMessage("hello"+i);
messages.add(message);
diskManager.sendMessage(queue,message);
}
for(int i=5;i<10;i++){
diskManager.deleteMessage(queue,messages.get(i));
}
//从硬盘上恢复数据
memoryManager.loadDiskData(diskManager);
Binding binding1=memoryManager.getBinding("testQueue","testExchange");
//因为这个对象是从硬盘上读出来的,所以引用指向的不是同一个对象
Assertions.assertEquals(binding.getBindingKey(),binding1.getBindingKey());
Assertions.assertEquals(binding.getQueueName(),binding1.getQueueName());
Assertions.assertEquals(binding.getExchangeName(),binding1.getExchangeName());
Exchange exchange1=memoryManager.getExchange("testExchange");
Assertions.assertEquals(exchange.getType(),exchange1.getType());
Assertions.assertEquals(exchange.getArguments(),exchange1.getArguments());
Assertions.assertEquals(exchange.isDurable(),exchange1.isDurable());
Assertions.assertEquals(exchange.isAutoDelete(),exchange1.isAutoDelete());
Assertions.assertEquals(exchange.getName(),exchange1.getName());
QueueEntity queue1=memoryManager.getQueue("testQueue");
Assertions.assertEquals(queue.getName(),queue1.getName());
Assertions.assertEquals(queue.getArguments(),queue1.getArguments());
Assertions.assertEquals(queue.isExclusive(),queue1.isExclusive());
Assertions.assertEquals(queue.isAutoDelete(),queue1.isAutoDelete());
Assertions.assertEquals(queue.isDurable(),queue1.isDurable());
int size=memoryManager.getQueueSize(queue1);
Assertions.assertEquals(5,size);
Message message=memoryManager.getMessageFromQueue(queue1.getName());
Assertions.assertArrayEquals("hello0".getBytes(),message.getBody());
//删除硬盘上的数据
diskManager.deleteQueue(queue.getName());
DemoApplication.context.close();
File file=new File("./data/meta.db");
file.delete();
}
}
虚拟主机,是一个逻辑上的集合,里面包含交换机,绑定,队列和消息.?
有没有童靴发现,我们写的代码里,除了EOFException(文件读到结尾是抛出的异常,属于正常读取文件产生的异常),其他的异常我们都是直接抛出去,交给上层调用者解决.这个上层调用者就是Virtual Host,它负责提供9个API,同时要解决所有可能出现的异常.
下面是VH的代码
@Data
@Slf4j
public class VirtualHost {
//管理硬盘数据
private DiskManager diskManager=new DiskManager();
//管理内存数据
private MemoryManager memoryManager=new MemoryManager();
//虚拟主机的唯一标识
private String virtualHost;
//多线程涉及到加锁操作
private Object exchangeLocker=new Object();
private Object queueLocker=new Object();
//Router类负责检查routingKey和bindingKey
private Router router=new Router();
public VirtualHost(){
//初始化硬盘数据
diskManager.init();
//初始化内存数据
memoryManager.init();
//将硬盘数据加载到内存里
try {
memoryManager.loadDiskData(diskManager);
} catch (Exception e) {
log.info("内存恢复数据失败!");
e.printStackTrace();
}
}
//没有则创建,有则不创建
public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable, boolean autoDelete, Map<String,Object> arguments){
//虚拟主机和交换机之间是1:n的关系,需要在exchangeName前面加上虚拟主机标识
exchangeName=virtualHost+exchangeName;
synchronized (exchangeLocker){
try{
if(memoryManager.getExchange(exchangeName)!=null){
//有这个交换机,直接返回true
log.info("要创建的交换机已经存在,exchangeName="+exchangeName);
return true;
}
Exchange exchange=new Exchange();
exchange.setName(exchangeName);
exchange.setDurable(durable);
exchange.setType(type);
exchange.setAutoDelete(autoDelete);
exchange.setArguments(arguments);
if(durable){
//将这个交换机持久化
diskManager.insertExchange(exchange);
}
memoryManager.insertExchange(exchange);
log.info("创建交换机完成.exchangeName="+exchangeName);
return true;
}catch (Exception e){
log.info("创建交换机失败.exchangeName="+exchangeName);
e.printStackTrace();
return false;
}
}
}
//删除操作,同样不加锁,因为多删一次没啥副作用
public boolean exchangeDelete(String exchangeName){
exchangeName=virtualHost+exchangeName;
try{
Exchange exchange=memoryManager.getExchange(exchangeName);
if(exchange==null){
//说明要删除的交换机不存在,打印一下日志
log.info("要删除的交换机不存在,exchangeName="+exchangeName);
return false;
}
if(exchange.isDurable()){
//硬盘上删除该交换机
diskManager.deleteExchange(exchangeName);
}
memoryManager.deleteExchange(exchangeName);
log.info("删除交换机成功,exchangeName="+exchangeName);
return true;
}catch (Exception e){
log.info("删除交换机失败,exchangeName="+exchangeName);
e.printStackTrace();
return false;
}
}
public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,Map<String,Object> arguments){
queueName=virtualHost+queueName;
synchronized (queueLocker){
try{
if(memoryManager.getQueue(queueName)!=null){
log.info("要创建的队列已存在");
return true;
}
QueueEntity queue=new QueueEntity();
queue.setName(queueName);
queue.setDurable(durable);
queue.setAutoDelete(autoDelete);
queue.setExclusive(exclusive);
queue.setArguments(arguments);
//先进行硬盘操作,硬盘操作更有可能抛异常
//如果先进行内存操作,但是硬盘操作抛异常,还要进行数据恢复
if(durable){
diskManager.insertQueue(queue);
}
memoryManager.insertQueue(queue);
log.info("创建队列成功,queueName="+queueName);
return true;
} catch (Exception e) {
log.info("创建队列失败,queueName="+queueName);
e.printStackTrace();
return false;
}
}
}
public boolean queueDelete(String queueName){
queueName=virtualHost+queueName;
try{
QueueEntity queue=memoryManager.getQueue(queueName);
if(queue==null){
log.info("要删除的队列不存在,queueName="+queueName);
return false;
}
if(queue.isDurable()){
diskManager.deleteQueue(queueName);
}
memoryManager.deleteQueue(queueName);
log.info("删除队列成功,queueName="+queueName);
return true;
}catch (Exception e){
log.info("删除队列失败,queueName="+queueName);
e.printStackTrace();
return false;
}
}
public boolean queueBind(String queueName,String exchangeName,String bindingKey){
queueName=virtualHost+queueName;
exchangeName=virtualHost+exchangeName;
synchronized (exchangeLocker){
synchronized (queueLocker){
try{
if(memoryManager.getBinding(queueName,exchangeName)!=null){
//这个绑定已经存在,不允许
throw new MqException("要创建的绑定已经存在,exchangeName="+exchangeName+",queueName="+queueName);
}
if(!router.checkBindingKey(bindingKey)){
throw new MqException("创建绑定失败,bindingKey非法,bindingKey="+bindingKey);
}
Binding binding=new Binding();
binding.setQueueName(queueName);
binding.setExchangeName(exchangeName);
binding.setBindingKey(bindingKey);
//交换机和队列都持久化时,绑定自然就持久化了
Exchange exchange=memoryManager.getExchange(exchangeName);
if(exchange==null){
throw new MqException("创建绑定失败,交换机不存在,exchangeName="+exchangeName);
}
QueueEntity queue=memoryManager.getQueue(queueName);
if(queue==null){
throw new MqException("创建绑定失败,队列不存在,queueName="+queueName);
}
if(exchange.isDurable()&&queue.isDurable()){
diskManager.insertBinding(binding);
}
memoryManager.insertBinding(binding);
log.info("创建绑定成功,binding="+binding);
return true;
} catch (Exception e) {
log.info("创建绑定失败,exchangeName="+exchangeName+",queueName="+queueName);
e.printStackTrace();
return false;
}
}
}
}
/**
* 解绑有两种情况: 1. 先删除了交换机和队列,再解除绑定 2.只解除了绑定
* 为了兼容第一种情况,不需要检验绑定的交换机和队列是否存在
* @param queueName
* @param exchangeName
* @return
*/
public boolean queueUnbind(String queueName,String exchangeName){
try{
Binding binding=memoryManager.getBinding(queueName,exchangeName);
if(binding==null){
log.info("解除绑定失败,绑定不存在,exchangeName="+exchangeName+",queueName="+queueName);
return false;
}
//即使绑定没有持久化,进行删除操作也没啥影响,否则还要判断队列和交换机是否持久化
diskManager.deleteBinding(binding);
memoryManager.deleteBinding(binding);
log.info("解除绑定成功,exchangeName"+exchangeName+",queueName="+queueName);
return true;
} catch (Exception e) {
log.info("解除绑定失败,exchangeName="+exchangeName+",queueName="+queueName);
e.printStackTrace();
return false;
}
}
/**
* 生产者调用该方法,指定发布消息到哪个交换机上
* 1. 找到交换机,判断类型
* 2. 创建消息,使用工厂方法创建
* 3. 根据交换机类型和消息的routingKey转发该消息到不同的队列中
* @return
*/
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties properties,byte[] body){
try{
if(exchangeName==null){
//表示使用默认交换机
exchangeName="";
}else {
exchangeName=virtualHost+exchangeName;
}
Exchange exchange=memoryManager.getExchange(exchangeName);
if(exchange==null){
throw new MqException("发布消息失败,指定交换机不存在,exchangeName="+exchangeName);
}
Message message=Message.createMessageWithId(routingKey,properties,body);
//对交换机的类型进行判断,因为直接交换机的转发规则比较简单,放在vh中
//topic和fanout交换机放在router类中进行转发判断
if(exchange.getType()==ExchangeType.DIRECT){
//此时的routingKey就是要转发的队列名字
String queueName=virtualHost+routingKey;
QueueEntity queue=memoryManager.getQueue(queueName);
if(queue==null){
throw new MqException("要转发的队列不存在,queueName="+queueName);
}
//判断消息是否持久化
if(message.isDelivery()==0x2){
diskManager.sendMessage(queue,message);
}
memoryManager.sendMessageToQueue(queueName,message);
}else{
//找到交换机绑定的queue
ConcurrentHashMap<String,Binding> bindingMap=memoryManager.getBindingsMap().get(exchangeName);
if(bindingMap==null){
//这个交换机没有绑定任何队列,不用转发消息,直接返回
return true;
}
for(Map.Entry<String,Binding> bindingEntry:bindingMap.entrySet()){
//得到队列
QueueEntity queue=memoryManager.getQueue(bindingEntry.getKey());
if(queue==null){
//本来要抛异常,但是为了不影响其他队列的转发,这里只打印日志
log.info("交换机绑定的队列不存在");
continue;
}
//判断是否转发
Binding binding=bindingEntry.getValue();
if(router.route(exchange.getType(),message, binding)){
//转发该消息
//将消息写入硬盘
if(message.isDelivery()==0x2){
diskManager.sendMessage(queue,message);
}
//将消息写入内存
memoryManager.sendMessageToQueue(queue.getName(),message);
//TODO:通知订阅者取消息
}
}
}
log.info("成功发布消息,exchangeName="+exchangeName+",messageId="+message.getMessageId());
return true;
} catch (Exception e) {
log.info("发布消息失败,exchangeName="+exchangeName);
e.printStackTrace();
return false;
}
}
}
下面是Router类的判断方法,用于检查RoutingKey和BindingKey是否合法,以及二者的匹配(这是自己规定的,童靴们也可以创建自己的匹配规则)
@Data
public class Router {
/**
* BindingKey格式规定:
* 1. 由'.'作为各部分的分隔符
* 2. 各部分可由字母,数字,下划线组成
* 3. *表示匹配一个部分,#表示匹配0个或多个部分,通配符只能作为单独的一部分
* 4. *和# , #和#不能相邻
* @param bindingKey
* @return
*/
public boolean checkBindingKey(String bindingKey){
if(bindingKey==""){
//如果创建绑定时绑定的交换机是直接交换机,或者交换机是扇出交换机,
// 用不到bindingKey,会将bindingKey设置成""
return true;
}
char[] chs=bindingKey.toCharArray();
for(char ch:chs){
if(ch>='a'&&ch<='z'){
continue;
}
if(ch>='A'&&ch<='Z'){
continue;
}
if(ch>='0'&&ch<='9'){
continue;
}
if(ch=='.'||ch=='_'||ch=='*'||ch=='#'){
continue;
}
return false;
}
//通配符只能作为一个独立的部分
String[] strs=bindingKey.split(".");
for(String str:strs){
if(str.length()>1&&(str.contains("*")||str.contains("#"))){
return false;
}
}
//通配符相邻
for(int i=0;i<strs.length-1;i++){
if(strs[i].equals("*")&&strs[i+1].equals("#")){
return false;
}
if(strs[i].equals("#")&&strs[i+1].equals("#")){
return false;
}
if(strs[i].equals("#")&&strs[i+1].equals("*")){
return false;
}
}
return true;
}
/**
* RoutingKey格式规定;
* 1. 由'.'作为各部分的分隔符
* 2. 各部分可由字母,数字,下划线组成
* @param routingKey
* @return
*/
public boolean checkRoutingKey(String routingKey){
if(routingKey==""){
//如果使用的是扇出交换机,这个消息会被转发到该交换机绑定的全部队列上,用不到routingKey
return true;
}
char[] chs=routingKey.toCharArray();
for(char ch:chs){
if(ch>='a'&&ch<='z'){
continue;
}
if(ch>='A'&&ch<='Z'){
continue;
}
if (ch=='.'||ch=='_'){
continue;
}
if(ch>='0'&&ch<='9'){
continue;
}
return false;
}
return true;
}
public boolean route(ExchangeType type, Message message, Binding binding) throws MqException {
if(type==ExchangeType.FANOUT){
//与该交换机绑定的全部队列都要存放该消息
return true;
}else if(type==ExchangeType.TOPIC){
return topicRoute(message,binding);
}else {
//属于异常情况
throw new MqException("交换机类型非法!exchangeType="+type);
}
}
private boolean topicRoute(Message message, Binding binding) {
String routingKey= message.getRoutingKey();
String bindingKey= binding.getBindingKey();
int routIndex=0;
int bindIndex=0;
String[] routKeys=routingKey.split("\\.");
String[] bindKeys=bindingKey.split("\\.");
while (routIndex<routKeys.length&&bindIndex<bindKeys.length){
//System.out.println("routing[key]="+routKeys[routIndex]+",binding[key]="+bindingKey);
//有通配符的情况
if(bindKeys[bindIndex].equals("*")){
bindIndex++;
routIndex++;
} else if(bindKeys[bindIndex].equals("#")){
bindIndex++;
if(bindIndex==bindKeys.length){
//表明#匹配的是后面的全部
return true;
}
routIndex=getRoutIndex(routIndex,routKeys,bindKeys[bindIndex]);
if(routIndex==-1){
return false;
}
bindIndex++;
routIndex++;
}else{
//普通字符串匹配
if(!routKeys[routIndex].equals(bindKeys[bindIndex])){
return false;
}
bindIndex++;
routIndex++;
}
}
//任何一个索引没有到达终点
if(bindIndex<bindKeys.length||routIndex<routKeys.length){
return false;
}
//System.out.println("routKey="+routingKey+",bindingKey="+bindingKey);
return true;
}
private int getRoutIndex(int routIndex, String[] routKeys, String bindKey) {
for(int i=routIndex;i<routKeys.length;i++){
if(routKeys[i].equals(bindKey)){
return i;
}
}
return -1;
}
}
上面提供的Virtual Host类的代码中,没有实现basicConsume和basicAck方法,并且basicPublish方法也是不完整的.因为这个方法的实现比较复杂,需要另外创建一些类~~
Consume类,是一个函数式接口,里面是一个回调方法,消费者通过这个方法来处理消息.
/**
* 一个函数型接口,消费者进行消息处理的逻辑
*/
@FunctionalInterface
public interface Consume {
/**
* @param consumerTag 消费者标识
* @param properties message的基本属性
* @param body message的主体
*/
void handleDelivery(String consumerTag, BasicProperties properties,byte[] body)throws IOException, MqException;
}
?Consumer类,表示一个消费者.
/**
* 表示一个完整的消费者
*/
@Data
public class Consumer {
//消费者唯一标识
private String consumerTag;
//订阅的队列的名字
private String queueName;
//是否自动回复
private boolean autoAck;
//消费消息的方法
private Consume consume;
public Consumer(String consumerTag,String queueName,boolean autoAck,Consume consume){
this.consumerTag=consumerTag;
this.queueName=queueName;
this.autoAck=autoAck;
this.consume=consume;
}
}
为了建立消费者和队列之间的订阅关系,我们需要给QueueEntity类增加一个成员变量,用于管理该队列的订阅者.
//管理订阅者
private LinkedList<Consumer> subscribers=new LinkedList<>();
//采用轮询的方式向消费者投递消息,因此需要记录每次轮到哪个消费者了
//涉及到线程安全,使用原子类
private AtomicInteger consumeToken=new AtomicInteger(0);
//这个消息被送到哪个订阅者手里
public Consumer chooseConsumer(){
synchronized (subscribers){
if(subscribers.size()==0){
return null;
}
int index=consumeToken.get()% subscribers.size();
consumeToken.getAndIncrement();
return subscribers.get(index);
}
}
//添加订阅者
public void addSubscriber(Consumer consumer){
synchronized (subscribers){
subscribers.add(consumer);
}
}
虚拟主机需要调用ConsumerManager类的方法,管理消费者和队列之间的关系.
下面我们来分析一下这个类的实现.
那么问题来了--扫描线程怎么知道哪个队列收到了消息呢,需要遍历每个队列对应的存储消息的LinkedList吗?
这样做未免太复杂了,我们可以使用一个tokenQueue阻塞队列,当某个队列收到消息后,它的queueName会被放进阻塞队列中,扫描线程只需要检查阻塞队列中是否有元素即可.(参见下图)
?有了这个逻辑,我们就来实现一下ConsumerManager类.
@Slf4j
public class ConsumerManager {
//manager类需要执行向队列中添加订阅者,收到Ack后删除消息等操作,因此要有一个vh对象
private VirtualHost virtualHost;
//阻塞队列,每当queue收到一个消息后,它的queueName就会被放到阻塞队列中
private BlockingQueue<String> tokenQueue=new LinkedBlockingQueue<>();
//线程池,用于执行消费者的回调函数
private ExecutorService workThreadPool= Executors.newFixedThreadPool(10);
//扫描线程,用于检查阻塞队列中是否有queueName
private Thread scannerThread=null;
public ConsumerManager(VirtualHost virtualHost){
this.virtualHost=virtualHost;
scannerThread=new Thread(()->{
try {
while(true){
//阻塞等待tokenQueue的元素
String queueName=tokenQueue.take();
//log.info("扫描线程收到消息");
//队列收到消息后,将消息发给订阅者
QueueEntity queue=virtualHost.getMemoryManager().getQueue(queueName);
if(queue==null){
//队列不存在
throw new MqException("投递消息的队列不存在,queueName="+queueName);
}
//将消息投递给消费者
synchronized (queue){
consumeMessage(queue);
}
}
} catch (Exception e) {
e.printStackTrace();
}
},"scannerThread");
//将扫描线程设置为后台进程,服务器终止时线程也随之终止
scannerThread.setDaemon(true);
scannerThread.start();
}
//添加订阅者
public void addSubscriber(String queueName, String consumerTag, boolean autoAck, Consume consume) throws MqException {
QueueEntity queue=virtualHost.getMemoryManager().getQueue(queueName);
if(queue==null){
//没有该队列
throw new MqException("[ConsumerManager] 添加订阅者失败,没有该队列,queueName="+queueName);
}
//创建一个消费者
Consumer consumer=new Consumer(consumerTag,queueName,autoAck,consume);
synchronized (queue){
//该队列不能接收其它消息
//将这个消费者添加到队列的订阅者列表中
queue.addSubscriber(consumer);
//如果之前有消息,将积压的消息处理掉
int n=virtualHost.getMemoryManager().getQueueSize(queue);
for(int i=0;i<n;i++){
consumeMessage(queue);
}
}
}
//因为调用该方法时加锁了,所以这个方法就不加锁了
private void consumeMessage(QueueEntity queue) {
//选出一个订阅者
Consumer luckyDog=queue.chooseConsumer();
if(luckyDog==null){
//当前队列没有订阅者,不进行投递
log.info("当前队列没有订阅者,queueName="+queue.getName());
return;
}
//得到消息
Message message=virtualHost.getMemoryManager().getMessageFromQueue(queue.getName());
if(message==null){
//当前队列没有消息,这属于异常情况,不过问题不大,打印一下日志就行
log.info("当前队列没有消费者,queueName="+queue.getName());
return;
}
//使用多线程,执行订阅者的消费逻辑
workThreadPool.submit(()->{
try {
//先复制一份保存在未确认序列中
virtualHost.getMemoryManager().insertMessageWaitAck(queue.getName(),message);
//消息中心也保存一份
virtualHost.getMemoryManager().insertMessage(message);
//执行消费者的消费逻辑
luckyDog.getConsume().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody() );
//如果消费者是自动应答,消费完后直接删除该消息的副本
if(luckyDog.isAutoAck()){
if(message.isDelivery()==0x2){
//删除硬盘中的数据
virtualHost.getDiskManager().deleteMessage(queue,message);
}
//删除内存中的数据
virtualHost.getMemoryManager().deleteMessageWaitAck(queue.getName(), message.getMessageId());
virtualHost.getMemoryManager().deleteMessage(message.getMessageId());
log.info("收到Ack,删除消息成功,messageId="+message.getMessageId());
}
//如果不是,删除的逻辑放在basicAck里执行
} catch (Exception e) {
log.info("消费信息失败,consumerTag="+luckyDog.getConsumerTag()+",messageId="+message.getMessageId());
e.printStackTrace();
}
});
}
//每次添加一个消息,要通知扫描线程
public void notifyScanner(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
}
实现完ConsumerManager类后,我们也就可以补全订阅/发布/确认消息的方法了.虚拟主机要添加一个ConsumerManager成员变量.
下面是这三个方法的实现.
/**
* 生产者调用该方法,指定发布消息到哪个交换机上
* 1. 找到交换机,判断类型
* 2. 创建消息,使用工厂方法创建
* 3. 根据交换机类型和消息的routingKey转发该消息到不同的队列中
* @return
*/
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties properties,byte[] body){
try{
if(exchangeName==null){
//表示使用默认交换机
exchangeName="";
}else {
exchangeName=virtualHost+exchangeName;
}
Exchange exchange=memoryManager.getExchange(exchangeName);
if(exchange==null){
throw new MqException("发布消息失败,指定交换机不存在,exchangeName="+exchangeName);
}
Message message=Message.createMessageWithId(routingKey,properties,body);
//对交换机的类型进行判断,因为直接交换机的转发规则比较简单,放在vh中
//topic和fanout交换机放在router类中进行转发判断
if(exchange.getType()==ExchangeType.DIRECT){
//此时的routingKey就是要转发的队列名字
String queueName=virtualHost+routingKey;
QueueEntity queue=memoryManager.getQueue(queueName);
if(queue==null){
throw new MqException("要转发的队列不存在,queueName="+queueName);
}
//判断消息是否持久化
if(message.isDelivery()==0x2){
diskManager.sendMessage(queue,message);
}
memoryManager.sendMessageToQueue(queueName,message);
//通知订阅者取消息
consumerManager.notifyScanner(queue.getName());
}else{
//找到交换机绑定的queue
ConcurrentHashMap<String,Binding> bindingMap=memoryManager.getBindingsMap().get(exchangeName);
if(bindingMap==null){
//这个交换机没有绑定任何队列,不用转发消息,直接返回
return true;
}
for(Map.Entry<String,Binding> bindingEntry:bindingMap.entrySet()){
//得到队列
QueueEntity queue=memoryManager.getQueue(bindingEntry.getKey());
if(queue==null){
//本来要抛异常,但是为了不影响其他队列的转发,这里只打印日志
log.info("交换机绑定的队列不存在");
continue;
}
//判断是否转发
Binding binding=bindingEntry.getValue();
if(router.route(exchange.getType(),message, binding)){
//转发该消息
//将消息写入硬盘
if(message.isDelivery()==0x2){
diskManager.sendMessage(queue,message);
}
//将消息写入内存
memoryManager.sendMessageToQueue(queue.getName(),message);
//通知订阅者取消息
consumerManager.notifyScanner(queue.getName());
}
}
}
log.info("成功发布消息,exchangeName="+exchangeName+",messageId="+message.getMessageId());
return true;
} catch (Exception e) {
log.info("发布消息失败,exchangeName="+exchangeName);
e.printStackTrace();
return false;
}
}
/**
* 消费者通过该接口订阅一个队列的消息
*/
public boolean basicConsume(String queueName, String consumerTag, boolean autoAck, Consume consume){
queueName=virtualHost+queueName;
try{
consumerManager.addSubscriber(queueName,consumerTag,autoAck,consume);
log.info("成功添加订阅者,queueName="+queueName+",consumerTag="+consumerTag);
return true;
} catch (Exception e) {
log.info("添加订阅者失败,queueName="+queueName+",consumerTag="+consumerTag);
e.printStackTrace();
return false;
}
}
/**
* 消费者主动调用该接口,确认某个消息
*/
public boolean basicAck(String queueName,String messageId){
queueName=virtualHost+queueName;
try{
//找到该队列
QueueEntity queue=memoryManager.getQueue(queueName);
if(queue==null){
throw new MqException("[VirtualHost] 确认消息失败,消息所在的队列不存在,queueName="+queueName);
}
//找到这个消息
Message message=memoryManager.getMessage(messageId);
if(message==null){
throw new MqException("[VirtualHost] 确认消息失败,要确认的消息不存在,messageId="+messageId);
}
//删除硬盘数据
if(message.isDelivery()==0x2){
diskManager.deleteMessage(queue,message);
}
//从待确认队列中删除该消息
memoryManager.deleteMessageWaitAck(queueName,messageId);
//消息中心删除该消息
memoryManager.deleteMessage(messageId);
log.info("收到Ack,删除消息成功,messageId="+messageId);
return true;
}catch (Exception e){
log.info("收到Ack,删除消息失败,messageId="+messageId);
e.printStackTrace();
return false;
}
}
@SpringBootTest
class VirtualHostTest {
private VirtualHost virtualHost=null;
@BeforeEach
void setUp() {
DemoApplication.context=SpringApplication.run(DemoApplication.class);
virtualHost=new VirtualHost("default");
}
@AfterEach
void tearDown() {
virtualHost=null;
DemoApplication.context.close();
File file=new File("./data/meta.db");
file.delete();
}
@Test
void exchangeTest() {
virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
Exchange exchange1=virtualHost.getMemoryManager().getExchange("defaulttestExchange");
List<Exchange> exchanges=virtualHost.getDiskManager().selectAllExchanges();
Assertions.assertNotNull(exchange1);
Assertions.assertEquals(2,exchanges.size());
virtualHost.exchangeDelete("testExchange");
exchange1=virtualHost.getMemoryManager().getExchange("testExchange");
exchanges=virtualHost.getDiskManager().selectAllExchanges();
Assertions.assertEquals(1,exchanges.size());
Assertions.assertNull(exchange1);
}
@Test
void queueTest() {
virtualHost.queueDeclare("testQueue",true,false,false,null);
QueueEntity queue=virtualHost.getMemoryManager().getQueue(virtualHost.getVirtualHost()+"testQueue");
List<QueueEntity> queueEntities=virtualHost.getDiskManager().selectAllQueues();
Assertions.assertNotNull(queue);
Assertions.assertEquals(1,queueEntities.size());
virtualHost.queueDelete("testQueue");
queue=virtualHost.getMemoryManager().getQueue(virtualHost.getVirtualHost()+"testQueue");
queueEntities=virtualHost.getDiskManager().selectAllQueues();
Assertions.assertNull(queue);
Assertions.assertEquals(0,queueEntities.size());
}
@Test
void queueBind() {
virtualHost.queueDeclare("testQueue",false,false,false,null);
virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,false,false,null);
Assertions.assertTrue(virtualHost.queueBind("testQueue","testExchange",""));
}
@Test
void queueUnbind() throws IOException {
virtualHost.queueDeclare("testQueue",true,false,false,null);
virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);
Assertions.assertTrue(virtualHost.queueBind("testQueue","testExchange",""));
Assertions.assertTrue(virtualHost.queueUnbind("testQueue","testExchange"));
virtualHost.getDiskManager().deleteQueue("defaulttestQueue");
}
@Test
void directPublish() {
virtualHost.queueDeclare("testQueue",false,false,false,null);
//表示交给默认交换机进行转发,RoutingKey就是队列名
Assertions.assertTrue(virtualHost.basicPublish(null,"testQueue",null,"hello".getBytes()));
//从队列中取消息
Message message=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue");
Assertions.assertArrayEquals("hello".getBytes(),message.getBody());
Assertions.assertEquals("testQueue",message.getRoutingKey());
}
@Test
void fanoutPublish() throws IOException {
virtualHost.exchangeDeclare("testExchange",ExchangeType.FANOUT,false,false,null);
virtualHost.queueDeclare("testQueue1",false,false,false,null);
virtualHost.queueDeclare("testQueue2",true,false,false,null);
//建立绑定,因为是fanout交换机,bindingKey没啥用
Assertions.assertTrue(virtualHost.queueBind("testQueue1","testExchange",""));
Assertions.assertTrue(virtualHost.queueBind("testQueue2","testExchange",""));
//向fanout交换机转发消息
Assertions.assertTrue(virtualHost.basicPublish("testExchange","",null,"hello".getBytes()));
//看看两个队列上是否都有这条消息
Message message1=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue1");
Assertions.assertArrayEquals("hello".getBytes(),message1.getBody());
Message message2=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue2");
Assertions.assertArrayEquals("hello".getBytes(),message2.getBody());
Assertions.assertEquals(message1.getMessageId(),message2.getMessageId());
virtualHost.getDiskManager().deleteQueue(virtualHost.getVirtualHost()+"testQueue2");
}
@Test
void topicPublish() throws IOException {
virtualHost.exchangeDeclare("testExchange",ExchangeType.TOPIC,false,false,null);
virtualHost.queueDeclare("testQueue1",false,false,false,null);
virtualHost.queueDeclare("testQueue2",false,false,false,null);
//建立绑定,两个bindingKey和routingKey都匹配
virtualHost.queueBind("testQueue1","testExchange","aaa.*.ccc");
virtualHost.queueBind("testQueue2","testExchange","aaa.#");
//发布消息
virtualHost.basicPublish("testExchange","aaa.bbb.ccc",null,"hello".getBytes());
//看看两个队列上是否都有这条消息
Message message1=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue1");
Assertions.assertArrayEquals("hello".getBytes(),message1.getBody());
Message message2=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue2");
Assertions.assertArrayEquals("hello".getBytes(),message2.getBody());
Assertions.assertEquals(message1.getMessageId(),message2.getMessageId());
//virtualHost.getDiskManager().deleteQueue(virtualHost.getVirtualHost()+"testQueue2");
}
//先发布再订阅,使用直接交换机,不需要建立绑定了
@Test
void basicConsume1() throws InterruptedException {
virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,false,false,null);
virtualHost.queueDeclare("testQueue",false,false,false,null);
//发布消息
virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());
Thread.sleep(200);
//订阅消息,使用自动回复,手动回复下面会进行测试
virtualHost.basicConsume("testQueue", "testConsumer", true, new Consume() {
@Override
public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {
Assertions.assertEquals("testConsumer",consumerTag);
Assertions.assertArrayEquals("hello".getBytes(),body);
}
});
}
//先订阅再发布,使用直接交换机,不需要建立绑定了
@Test
void basicConsume2() throws InterruptedException {
virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,false,false,null);
virtualHost.queueDeclare("testQueue",false,false,false,null);
//订阅消息,使用自动回复
virtualHost.basicConsume("testQueue", "testConsumer", true, new Consume() {
@Override
public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {
Assertions.assertEquals("testConsumer",consumerTag);
Assertions.assertArrayEquals("hello".getBytes(),body);
}
});
// Thread.sleep(200);
//发布消息
virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());
//留点时间等待消费者把消息处理掉
Thread.sleep(500);
}
@Test
void basicAck() throws InterruptedException {
virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,false,false,null);
virtualHost.queueDeclare("testQueue",false,false,false,null);
Message message=new Message();
virtualHost.basicConsume("testQueue", "testConsumer", false, new Consume() {
@Override
public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {
message.setMessageId(properties.getMessageId());
Assertions.assertEquals("testConsumer",consumerTag);
Assertions.assertArrayEquals("hello".getBytes(),body);
}
});
virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());
Thread.sleep(100);
//消费者手动回复
Assertions.assertTrue(virtualHost.basicAck("testQueue",message.getMessageId()));
}
}
前文做了那么多的铺垫,我们已经实现了Broker服务器内部的9个核心API,接下来我们就可以实现Broker服务器与客户端的交互了.
回顾一下交互模型.
左边的客户端是生产者,右边的客户端是消费者.它们与服务器之间都需要通过网络进行通信.
这里我们使用自定义的应用层协议,借助TCP协议进行客户端与服务器之间的通信.
客户端需要使用本地的API远程调用服务器的方法,也就是说,服务器提供的API和客户端本地提供的API是相互对应的.除了9个核心API之外,服务器还应该提供一些API.
学习网络通信的时候我们就谈到过,应用层协议是由程序猿自己定义的.而我们学过的http/https协议,是一些知名的应用层协议,并且是文本协议.
因为我们传输的消息是二进制的,不方便使用文本协议,我们可以自己定义二进制的应用层协议.
先来定义请求格式.
?下面是响应格式(和请求格式相比,不能说毫不相干,只能说一模一样)
?下面来规定一下type对应的API:
因为tcp是以字节为单位进行信息传输的,length部分是为了解决TCP协议的"粘包问题".
针对payLoad部分,如果是一个请求,payLoad里存放了调用方法需要的参数;如果是一个响应,payLoad里存放了方法的返回值.
首先建立Request和Response类,网络传输的主体就是它们.
/**
* 因为请求和响应需要在网络上进行传输,需要进行序列化
* 请求格式: type 4byte,length 4byte,payLoad
*/
@Data
public class Request implements Serializable {
private int type;
private int length;
private byte[] payLoad;
}
/**
* 响应格式: type 4byte,length 4byte,payLoad
*/
@Data
public class Response implements Serializable {
private int type;
private int length;
private byte[] payLoad;
}
针对payLoad部分,我们需要定义基本格式.
对于请求来说,payLoad部分需要有调用方法所需要的参数,还需要定义一个rid保证保证请求和响应之间的对应关系,同时也需要一个channelId用于标识是哪个通道调用的这个方法.
(先剧透一下,一个客户端和服务器建立连接后,就相当于建立了一个Connection,我们前面谈到过,一个Connection可以包含多个通道,每个通道发送请求后,在收到响应之前是阻塞状态,这个channelId就是用来唤醒阻塞的线程的)
/**
* 客户端发送请求时,要传的基本参数
*/
@Data
public class BasicArguments implements Serializable {
//与请求的rid形成对应关系
private String rid;
//标识一个通道
private String channelId;
}
因为方法的参数不同,要传输的对象也不同,我们需要针对每个方法分别定义参数类.
/**
* 针对basicPublish方法
*/
@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {
private String exchangeName;
private String routingKey;
private BasicProperties properties;
private byte[] body;
}
/**
* 针对basicConsume方法
*/
@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {
private String queueName;
private String consumerTag;
private boolean autoAck;
//basicConsume方法还有一个参数是回调函数,没有办法通过网络传输
//对于Broker Server来说,它处理消息的逻辑就是将消息发送给客户端
}
/**
* 对应exchangeDeclare方法
*/
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
private String exchangeName;
private ExchangeType type;
private boolean durable;
private boolean autoDelete;
private Map<String, Object> arguments;
}
/**
* 对应exchangeDelete方法
*/
@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
private String exchangeName;
}
/**
* 针对queueBind方法
*/
@Data
public class QueueBindArguments extends BasicArguments implements Serializable {
private String queueName;
private String exchangeName;
private String bindingKey;
}
/**
* 对应queueDeclare方法
*/
@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {
private String queueName;
private boolean durable;
private boolean autoDelete;
private boolean exclusive;
private Map<String,Object> arguments;
}
/**
* 针对queueDelete方法
*/
@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {
private String queueName;
}
/**
* 针对queueUnbind方法
*/
@Data
public class QueueUnbindArguments extends BasicArguments implements Serializable {
private String queueName;
private String exchangeName;
}
/**
* 针对basicAck方法
*/
@Data
public class BasicAckArguments extends BasicArguments implements Serializable {
private String queueName;
private String messageId;
}
/**
* 响应的基本格式
*/
@Data
public class BasicReturns implements Serializable {
private String rid;
private String channelId;
//调用9个api的返回值
private boolean isOk;
}
请求返回的payLoad基本上都是BasicReturns对象,除了服务器主动给客户端推送消息时,因此也需要一个额外的类来定义这个payLoad.
@Data
public class MessageReturns extends BasicReturns implements Serializable {
private String consumerTag;
private BasicProperties properties;
private byte[] body;
}
@Slf4j
public class BrokerServer {
//当前只实现一个虚拟主机,后续可自行进行扩展
private VirtualHost virtualHost=new VirtualHost("default_");
//key=channelId,value为socket对象,用于表示channel属于哪个TCP连接
private ConcurrentHashMap<String, Socket> channelsMap=new ConcurrentHashMap<>();
private ServerSocket serverSocket;
//给多个客户端提供服务时,需要用到多线程
private ExecutorService threadPool= Executors.newCachedThreadPool();
//用于关闭服务器,本来只需要kill掉进程即可,这里是为了方便进行单元测试
private volatile boolean run=true;
public BrokerServer(int port) throws IOException {
serverSocket=new ServerSocket(port);
}
public void start(){
try{
log.info("Broker Server启动服务!");
while (run){
//接收一个请求
Socket clientSocket=serverSocket.accept();
//多线程处理请求
threadPool.submit(()->{
processConnection(clientSocket);
});
}
} catch (IOException e) {
log.info("服务器关闭");
//e.printStackTrace();
}
}
public void stop() throws IOException {
run=false;
//关闭全部线程
threadPool.shutdownNow();
//关闭连接
serverSocket.close();
}
private void processConnection(Socket clientSocket) {
try(InputStream inputStream= clientSocket.getInputStream();
OutputStream outputStream=clientSocket.getOutputStream()) {
try (DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
//因为采用的是长连接,需要不断的读取请求
while (true) {
//接收请求并解析
Request request = readRequest(dataInputStream);
//处理请求
Response response = processRequest(request, clientSocket);
//将响应发送给客户端
writeResponse(dataOutputStream, response);
}
} catch (EOFException | SocketException e) {
log.info("信息传输结束,clientSocket=" + clientSocket.getRemoteSocketAddress());
}
}catch (IOException | ClassNotFoundException | MqException e) {
log.info("信息传输异常");
e.printStackTrace();
}finally {
//断开连接
try {
clientSocket.close();
//断开连接之后,这个TCP连接包含的channel应该清除
clearChannel(clientSocket);
log.info("与客户端断开连接,clientSocket="+clientSocket.getRemoteSocketAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void clearChannel(Socket clientSocket) {
LinkedList<String> channels=new LinkedList<>();
for(Map.Entry<String,Socket> entry: channelsMap.entrySet()){
if(entry.getValue()==clientSocket){
//对于集合类,不能一边遍历一边清除元素
channels.add(entry.getKey());
}
}
for(String channelId:channels){
channelsMap.remove(channelId);
}
log.info("已清理客户端的全部channel,clientSocket="+clientSocket.getRemoteSocketAddress());
}
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayLoad());
//刷新缓冲区
dataOutputStream.flush();
}
private Request readRequest(DataInputStream dataInputStream) throws IOException {
Request request=new Request();
request.setType(dataInputStream.readInt());
int expectedLength=dataInputStream.readInt();
byte[] payLoad=new byte[expectedLength];
int actualLength=dataInputStream.read(payLoad);
if(expectedLength!=actualLength){
throw new IOException("[BrokerServer] 请求的格式不正确!expectedLength="+expectedLength+",actualLength="+actualLength);
}
request.setLength(actualLength);
request.setPayLoad(payLoad);
return request;
}
private Response processRequest(Request request, Socket clientSocket) throws MqException, IOException, ClassNotFoundException {
BasicArguments basicArguments= (BasicArguments) BinaryTool.fromBytes(request.getPayLoad());
//log.info("收到的arguments="+basicArguments);
boolean ok=true;
//分析响应类型
switch (request.getType()){
case 0x1:{
//创建channel
String channelId=basicArguments.getChannelId();
channelsMap.put(channelId,clientSocket);
log.info("成功创建channel");
break;
}
case 0x2:{
//销毁channel
String channelId= basicArguments.getChannelId();
channelsMap.remove(channelId);
log.info("销毁channel成功");
break;
}
case 0x3:{
//创建交换机
//先转换成API需要的参数对象
ExchangeDeclareArguments arguments=(ExchangeDeclareArguments) basicArguments;
ok=virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getType(), arguments.isDurable(), arguments.isAutoDelete(),arguments.getArguments());
break;
}
case 0x4:{
//销毁交换机
ExchangeDeleteArguments arguments=(ExchangeDeleteArguments) basicArguments;
ok= virtualHost.exchangeDelete(arguments.getExchangeName());
break;
}
case 0x5:{
//创建队列
QueueDeclareArguments arguments=(QueueDeclareArguments) basicArguments;
ok= virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(), arguments.isAutoDelete(), arguments.isExclusive(),arguments.getArguments());
break;
}
case 0x6:{
//销毁队列
QueueDeleteArguments arguments=(QueueDeleteArguments) basicArguments;
ok= virtualHost.queueDelete(arguments.getQueueName());
break;
}
case 0x7:{
//创建绑定
QueueBindArguments arguments=(QueueBindArguments) basicArguments;
ok= virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
break;
}
case 0x8:{
//解除绑定
QueueUnbindArguments arguments=(QueueUnbindArguments) basicArguments;
ok= virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
break;
}
case 0x9:{
//发布消息
BasicPublishArguments arguments=(BasicPublishArguments) basicArguments;
ok= virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(), arguments.getProperties(), arguments.getBody());
//log.info("消息内容,body="+new String(arguments.getBody()));
break;
}
case 0xa:{
//订阅队列
BasicConsumeArguments arguments=(BasicConsumeArguments) basicArguments;
ok= virtualHost.basicConsume(arguments.getQueueName(), arguments.getConsumerTag(), arguments.isAutoAck(), new Consume() {
//这个回调函数负责将收到的消息发送给客户端
@Override
public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {
//先找到要发给哪个客户端,consumerTag实际上就是channelId
Socket luckyDogSocket=channelsMap.get(consumerTag);
if(luckyDogSocket==null||luckyDogSocket.isClosed()){
//说明服务器和客户端的连接断开,属于异常情况
throw new MqException("[BrokerServer] 客户端与服务器连接异常,clientSocket="+luckyDogSocket.getRemoteSocketAddress());
}
//构造MessageReturns对象
MessageReturns messageReturns=new MessageReturns();
//因为这个响应是由服务器主动发起的,没有对应的rid
messageReturns.setConsumerTag(consumerTag);
messageReturns.setChannelId(consumerTag);
//messageReturns.setRid("");
messageReturns.setProperties(properties);
messageReturns.setBody(body);
//将这个对象变成字节数组,作为response的载荷
byte[] payLoad=BinaryTool.toBytes(messageReturns);
//构造一个Response
Response response=new Response();
response.setType(0x0c);
response.setLength(payLoad.length);
response.setPayLoad(payLoad);
//将响应发送给客户端
DataOutputStream outputStream=new DataOutputStream(luckyDogSocket.getOutputStream());
synchronized (luckyDogSocket){
//多线程发给同一个客户端时message会发生错误
writeResponse(outputStream,response);
}
}
});
break;
}
case 0xb:{
//发送ack
BasicAckArguments arguments=(BasicAckArguments) basicArguments;
ok=virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
break;
}
default:{
//异常情况
throw new MqException("[BrokerServer] 请求类型不正确,requestType="+request.getType());
}
}
//构造一个响应
//构造一个return对象
BasicReturns returns=new BasicReturns();
returns.setOk(ok);
returns.setChannelId(basicArguments.getChannelId());
returns.setRid(basicArguments.getRid());
byte[] payLoad=BinaryTool.toBytes(returns);
//构造一个响应
Response response=new Response();
response.setType(request.getType());
response.setLength(payLoad.length);
response.setPayLoad(payLoad);
log.info("处理得到响应,basicReturns="+returns);
return response;
}
}
因为网络通信不能只有服务器,所以我们要先实现客户端部分才能测试二者之间的通信
对于客户端来说,我们规定有两个类,Connection类负责TCP的连接和通信,Channel类负责逻辑上对TCP连接的细分.
/**
* 负责和服务器的通信
*/
@Data
@Slf4j
public class Connection {
//与服务器建立连接
private Socket socket;
//socket的输入流和输出流对象
private InputStream inputStream;
private OutputStream outputStream;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
//记录当前TCP连接都包含了哪些channel,key=channelId
private ConcurrentHashMap<String,Channel> channelMap=new ConcurrentHashMap<>();
//使用线程池,进行响应的处理
private ExecutorService workThreadPool= Executors.newFixedThreadPool(10);
public Connection(String host, int port) throws IOException {
socket=new Socket(host,port);
inputStream=socket.getInputStream();
outputStream= socket.getOutputStream();
dataInputStream=new DataInputStream(inputStream);
dataOutputStream=new DataOutputStream(outputStream);
//创建一个扫描线程,用于接收响应
Thread scannerThread=new Thread(()->{
try {
while(!socket.isClosed()){
Response response=readResponse();
//log.info("收到响应,response="+response);
processResponse(response);
}
} catch (SocketException | EOFException e) {
log.info("信息传输完毕,连接断开,serverSocket="+socket.getRemoteSocketAddress());
//e.printStackTrace();
}catch (Exception e){
log.info("连接异常,serverSocket="+socket.getRemoteSocketAddress());
e.printStackTrace();
}
},"scannerThreadClient");
scannerThread.setDaemon(true);
scannerThread.start();
}
private void processResponse(Response response) {
//因为客户端处理响应比较耗时,采用多线程
workThreadPool.submit(()->{
try {
//解析payLoad
byte[] payLoad= response.getPayLoad();
//响应分为两种类型: 1. 请求收到的响应 2. 订阅的消息
if(response.getType()==0xc){
//订阅的消息,没有对应的rid
MessageReturns messageReturns=(MessageReturns) BinaryTool.fromBytes(payLoad);
//调用channel的handleDelivery方法
Channel channel=channelMap.get(messageReturns.getChannelId());
if(channel==null){
throw new MqException("[Connection] 接收message失败,消息对应的channel不存在,channelId="+messageReturns.getChannelId());
}
channel.getConsume().handleDelivery(messageReturns.getConsumerTag(), messageReturns.getProperties(), messageReturns.getBody());
}else{
//请求收到对应的响应
BasicReturns returns= (BasicReturns) BinaryTool.fromBytes(payLoad);
//将return加入到channel的map中
String channelId=returns.getChannelId();
Channel channel=channelMap.get(channelId);
if(channel==null){
//通道不存在,抛异常
throw new MqException("[Connection] 处理响应失败,响应的通道不存在,channelId="+channelId);
}
//唤醒等待响应的线程
channel.notifyWaitAck(returns);
}
} catch (IOException | ClassNotFoundException|MqException e) {
log.info("处理响应失败");
e.printStackTrace();
}
});
}
/**
* 创建一个channel
* channelId随机生成,以"C-"作为前缀
*/
public Channel createChannel() throws IOException, InterruptedException {
String channelId="C-"+ UUID.randomUUID();
Channel channel=new Channel(channelId,this);
//将channel添加到本地
channelMap.put(channelId,channel);
//服务器那边也要创建channel
boolean ok=channel.createChannel();
if(!ok){
log.info("服务器创建channel失败!");
}else{
log.info("创建channel成功!");
}
return channel;
}
public void deleteChannel(String channelId) throws IOException, InterruptedException {
//先找到本地的channel
Channel channel=channelMap.get(channelId);
if(channel==null){
log.info("删除channel失败,要删除的channel不存在,channelId="+channelId);
return;
}
//这个channel发送删除请求
boolean ok=channel.deleteChannel();
if(ok){
log.info("服务器删除channel成功");
}else{
log.info("服务器删除channel失败");
}
}
/**
* 读取响应
*/
public Response readResponse() throws IOException {
Response response=new Response();
response.setType(dataInputStream.readInt());
int expectedLength=dataInputStream.readInt();
byte[] payLoad=new byte[expectedLength];
int actualLength=dataInputStream.read(payLoad);
if(actualLength!=expectedLength){
throw new IOException("[Connection] 响应的格式不正确,actualLength="+actualLength+",expectedLength="+expectedLength+",type="+response.getType() );
}
response.setLength(actualLength);
response.setPayLoad(payLoad);
return response;
}
/**
* 输出请求
*/
public void writeRequest(Request request) throws IOException {
dataOutputStream.writeInt(request.getType());
dataOutputStream.writeInt(request.getLength());
dataOutputStream.write(request.getPayLoad());
dataOutputStream.flush();
}
//断开连接
public void close() throws IOException {
//关闭线程池
workThreadPool.shutdownNow();
//断开TCP连接
dataOutputStream.close();
dataInputStream.close();
socket.close();
channelMap=null;
}
}
@Data
@Slf4j
public class Channel {
//表示自己是属于哪个连接的
private Connection connection;
//唯一标识
private String channelId;
//每个channel对应一个消费逻辑
private Consume consume;
//每个channel管理自己的rid(发送请求后接收对应的响应)
//key为rid,value为响应的payLoad
private ConcurrentHashMap<String, BasicReturns> returnsMap=new ConcurrentHashMap<>();
public Channel(String channelId,Connection connection){
this.channelId=channelId;
this.connection=connection;
}
public boolean createChannel() throws IOException, InterruptedException {
//构造请求参数
BasicArguments arguments=new BasicArguments();
arguments.setChannelId(channelId);
arguments.setRid(generateRid());
byte[] payLoad= BinaryTool.toBytes(arguments);
//构造一个请求
Request request=new Request();
request.setType(0x1);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
//让connection去发送
connection.writeRequest(request);
//等待服务器的响应
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
private BasicReturns waitReturns(String rid) throws InterruptedException {
synchronized (this){
while (true){
if(returnsMap.get(rid)==null){
wait();
}
return returnsMap.get(rid);
}
}
}
/**
* 请求的rid随机生成,以"r-"作为前缀
* @return
*/
private String generateRid() {
return "r-"+ UUID.randomUUID();
}
public boolean deleteChannel() throws IOException, InterruptedException{
//构造请求参数
BasicArguments arguments=new BasicArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
byte[] payLoad=BinaryTool.toBytes(arguments);
//构造请求
Request request=new Request();
request.setType(0x2);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
//发送请求
connection.writeRequest(request);
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable, boolean autoDelete, Map<String,Object> argument) throws IOException, InterruptedException {
//构造请求参数
ExchangeDeclareArguments arguments=new ExchangeDeclareArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setExchangeName(exchangeName);
arguments.setType(type);
arguments.setDurable(durable);
arguments.setAutoDelete(autoDelete);
arguments.setArguments(argument);
byte[] payLoad=BinaryTool.toBytes(arguments);
//构造请求
Request request=new Request();
request.setType(0x3);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
//发送请求
connection.writeRequest(request);
//等待响应
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
public boolean exchangeDelete(String exchangeName) throws IOException, InterruptedException {
//构造请求参数
ExchangeDeleteArguments arguments=new ExchangeDeleteArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setExchangeName(exchangeName);
byte[] payLoad=BinaryTool.toBytes(arguments);
//构造请求
Request request=new Request();
request.setType(0x4);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
connection.writeRequest(request);
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,Map<String,Object> argument) throws InterruptedException, IOException {
QueueDeclareArguments arguments=new QueueDeclareArguments();
arguments.setChannelId(channelId);
arguments.setRid(generateRid());
arguments.setArguments(argument);
arguments.setAutoDelete(autoDelete);
arguments.setExclusive(exclusive);
arguments.setQueueName(queueName);
arguments.setDurable(durable);
byte[] payLoad=BinaryTool.toBytes(arguments);
//构造请求
Request request=new Request();
request.setType(0x5);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
connection.writeRequest(request);
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
public boolean queueDelete(String queueName) throws InterruptedException, IOException {
QueueDeleteArguments arguments=new QueueDeleteArguments();
arguments.setChannelId(channelId);
arguments.setRid(generateRid());
arguments.setQueueName(queueName);
byte[] payLoad=BinaryTool.toBytes(arguments);
Request request=new Request();
request.setType(0x6);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
connection.writeRequest(request);
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
public boolean queueBind(String queueName,String exchangeName,String bindingKey) throws IOException, InterruptedException {
//构造请求参数
QueueBindArguments arguments=new QueueBindArguments();
arguments.setChannelId(channelId);
arguments.setRid(generateRid());
arguments.setQueueName(queueName);
arguments.setExchangeName(exchangeName);
arguments.setBindingKey(bindingKey);
byte[] payLoad=BinaryTool.toBytes(arguments);
//构造请求
Request request=new Request();
request.setType(0x7);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
//发送请求
connection.writeRequest(request);
//等待响应
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
public boolean queueUnbind(String queueName,String exchangeName) throws IOException, InterruptedException {
//构造请求参数
QueueUnbindArguments arguments=new QueueUnbindArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
arguments.setExchangeName(exchangeName);
byte[] payLoad=BinaryTool.toBytes(arguments);
//构造请求
Request request=new Request();
request.setType(0x8);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
//发送请求
connection.writeRequest(request);
//等待响应
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties properties,byte[] body) throws IOException, InterruptedException {
//构造请求参数
BasicPublishArguments arguments=new BasicPublishArguments();
arguments.setChannelId(channelId);
arguments.setRid(generateRid());
arguments.setExchangeName(exchangeName);
arguments.setRoutingKey(routingKey);
arguments.setBody(body);
arguments.setProperties(properties);
byte[] payLoad=BinaryTool.toBytes(arguments);
//构造请求
Request request=new Request();
request.setType(0x9);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
connection.writeRequest(request);
//等待响应
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
/**
* 对应basicConsume方法,本来还应该有一个参数是consumerTag,
* 但是因为consumerTag==channelId,所以就不作为参数传递了,直接在方法内部赋值
*/
public boolean basicConsume(String queueName,boolean autoAck,Consume consume) throws InterruptedException, IOException, MqException {
//一个channel只能有一个回调函数
if(consume!=null) {
if (this.consume == null) {
this.consume = consume;
} else {
throw new MqException("[Channel] 该通道已有消费逻辑!,channelId=" + channelId);
}
}
//构造请求参数
BasicConsumeArguments arguments=new BasicConsumeArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setConsumerTag(channelId);
arguments.setAutoAck(autoAck);
arguments.setQueueName(queueName);
byte[] payLoad=BinaryTool.toBytes(arguments);
//构造请求
Request request=new Request();
request.setType(0xa);
request.setLength(payLoad.length);
request.setPayLoad(payLoad);
connection.writeRequest(request);
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
public boolean basicAck(String queueName,String messageId) throws IOException, InterruptedException {
//构造请求参数
BasicAckArguments arguments=new BasicAckArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
arguments.setMessageId(messageId);
byte[] payLoad=BinaryTool.toBytes(arguments);
//构造请求
Request request=new Request();
request.setPayLoad(payLoad);
request.setType(0xb);
request.setLength(payLoad.length);
connection.writeRequest(request);
BasicReturns returns=waitReturns(arguments.getRid());
return returns.isOk();
}
/**
* 因为Connection中使用了线程池来处理每个响应,所以应该唤醒多线程
* @param returns
*/
public void notifyWaitAck(BasicReturns returns) {
synchronized (this){
returnsMap.put(returns.getRid(),returns);
notifyAll();
}
}
}
可以另外建一个ConnectionFactory类,用来创建Connection对象.
@Data
public class ConnectionFactory {
//指定服务器的Ip和端口
private String host;
private int port;
Connection createConnection() throws IOException {
return new Connection(host,port);
}
}
下面给出博主进行测试的代码
@SpringBootTest
class ConnectionTest {
private BrokerServer server=null;
private ConnectionFactory factory=new ConnectionFactory();
//因为需要同时开启服务器和客户端,用到多线程
private Thread thread=null;
@BeforeEach
void setUp() {
DemoApplication.context= SpringApplication.run(DemoApplication.class);
thread=new Thread(()->{
try {
server=new BrokerServer(8060);
server.start();
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
factory.setHost("127.0.0.1");
factory.setPort(8060);
}
@AfterEach
void tearDown() throws IOException {
server.stop();
DemoApplication.context.close();
//删文件
File dataDir=new File("./data");
for(File file:dataDir.listFiles()){
if(file.isDirectory()){
FileUtils.deleteDirectory(file);
}else if(file.isFile()){
file.delete();
}
}
}
@Test
void createConnection() throws IOException {
Connection connection=factory.createConnection();
Assertions.assertNotNull(connection);
}
@Test
void createChannel() throws IOException, InterruptedException {
Connection connection=factory.createConnection();
Assertions.assertNotNull(connection);
Channel channel=connection.createChannel();
Assertions.assertNotNull(channel);
}
@Test
void deleteChannel() throws IOException, InterruptedException {
Connection connection=factory.createConnection();
Assertions.assertNotNull(connection);
Channel channel=connection.createChannel();
Assertions.assertNotNull(channel);
connection.deleteChannel(channel.getChannelId());
connection.close();
}
@Test
void testQueue() throws IOException, InterruptedException {
Connection connection=factory.createConnection();
Assertions.assertNotNull(connection);
Channel channel=connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok1=channel.queueDeclare("testQueue",true,true,false,null);
boolean ok=channel.queueDelete("testQueue");
channel.deleteChannel();
connection.close();
}
@Test
void testExchange() throws IOException, InterruptedException {
Connection connection=factory.createConnection();
Assertions.assertNotNull(connection);
Channel channel=connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok=channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);
Assertions.assertTrue(ok);
ok=channel.exchangeDelete("testExchange");
Assertions.assertTrue(ok);
channel.deleteChannel();
connection.close();
}
@Test
void testBind() throws IOException, InterruptedException, MqException {
Connection connection=factory.createConnection();
Assertions.assertNotNull(connection);
Channel channel=connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok=channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);
Assertions.assertTrue(ok);
ok=channel.queueDeclare("testQueue1",true,false,false,null);
Assertions.assertTrue(ok);
ok=channel.queueDeclare("testQueue2",true,false,false,null);
Assertions.assertTrue(ok);
ok=channel.queueBind("testQueue1","testExchange","#.222");
Assertions.assertTrue(ok);
ok=channel.queueBind("testQueue2","testExchange","*.*");
Assertions.assertTrue(ok);
ok=channel.basicPublish("testExchange","111.222",null,"hello".getBytes());
Assertions.assertTrue(ok);
channel.basicConsume("testQueue1", true, new Consume() {
@Override
public void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {
Assertions.assertArrayEquals("hello".getBytes(),body);
// System.out.println(new String(body));
}
});
channel.basicConsume("testQueue2", true, null);
}
}
经过测试之后,我们的消费队列就算完成了,当然,也可以创建一个生产者和一个客户端来模拟一下.
先来修改一下DemoApplication类,因为我们要使用Spring来启动程序.
@SpringBootApplication
public class DemoApplication {
public static ConfigurableApplicationContext context;
public static void main(String[] args) throws IOException {
context=SpringApplication.run(DemoApplication.class, args);
BrokerServer server=new BrokerServer(9090);
server.start();
}
}
再来创建一个demo
public class DemoConsumer {
public static void main(String[] args) throws IOException, MqException, InterruptedException, IOException {
System.out.println("启动消费者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.createConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
channel.queueDeclare("testQueue", true, false, false, null);
channel.basicConsume("testQueue", true, new Consume() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws
MqException, IOException {
System.out.println("[消费数据] 开始!");
System.out.println("consumerTag=" + consumerTag);
System.out.println("basicProperties=" + basicProperties);
String bodyString = new String(body, 0, body.length);
System.out.println("body=" + bodyString);
System.out.println("[消费数据] 结束!");
}
});
// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.
while (true) {
Thread.sleep(500);
}
}
}
public class DemoProducer {
public static void main(String[] args) throws IOException, InterruptedException, IOException {
System.out.println("启动生产者");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.createConnection();
Channel channel = connection.createChannel();
// 创建交换机和队列
channel.queueDeclare("testQueue", true, false, false, null);
// 创建一个消息并发送
byte[] body = "hello".getBytes();
//使用默认交换机
boolean ok = channel.basicPublish("", "testQueue", null, body);
System.out.println("消息投递完成! ok=" + ok);
Thread.sleep(500);
channel.deleteChannel();
connection.close();
}
}
上面的过程,我们完成了从BrokerServer的内部实现,到客户端与服务器进行TCP通信的过程.当然,这只是一个简化版的消息队列,对于RabbitMQ来说,还有很多功能没有实现,同学们可自行参考它进行扩展.