.Net 6 下WorkerService+RabbitMq实现消息的异步发布订阅

发布时间:2024年01月17日

? ? ? ? 近期项目里有需要用到RabbitMq实现一些业务,学习整理之后在此记录一下,如有问题或者不对的地方,欢迎留言指正。

一、首先创建连接工厂

 public class RabbitMQProvider
    {
        private readonly string _ipAddress;
        private readonly int? _port;
        private readonly string _username;
        private readonly string _password;

        public RabbitMQProvider()
        {
            _ipAddress = ConfigurationHelper.GetKey("RabbitMQIPAddress") ?? throw new ArgumentException("IP地址未配置!");
            _username = ConfigurationHelper.GetKey("RabbitMQUserName") ?? throw new ArgumentException("用户名不能为空");
            _password = ConfigurationHelper.GetKey("RabbitMQPassword") ?? throw new ArgumentException("密码不能为空");

            var timeApan = new TimeSpan(0, 5, 0);
            if (ConnectionFactory == null)
            {
                ConnectionFactory = new ConnectionFactory//创建连接工厂对象
                {
                    HostName = _ipAddress,//IP地址
                    UserName = _username,//用户账号
                    Password = _password,//用户密码
                    //启用自动连接恢复
                    AutomaticRecoveryEnabled = true,
                    //VirtualHost = "/mqtest",//RabbitMQ中要请求的VirtualHost名称
                    ContinuationTimeout = timeApan,
                    HandshakeContinuationTimeout = timeApan,
                    RequestedConnectionTimeout = timeApan,
                    SocketReadTimeout = timeApan,
                    SocketWriteTimeout = timeApan,
                    //启用异步消费
                    DispatchConsumersAsync = true,
                    //RequestedChannelMax = 5000
                };

            }

        }

        public ConnectionFactory ConnectionFactory { get; }


        private static IConnection connection;
        /// <summary>
        /// 获取RabbitMQ连接对象方法(创建与RabbitMQ的连接)
        /// </summary>
        /// <returns></returns>
        public IConnection GetConnection()
        {
            if (connection == null || !connection.IsOpen)
            {
                //通过工厂创建连接对象
                connection = ConnectionFactory.CreateConnection();
            }
            return connection;
        }

        int times = 0;
        private static IModel Channel;
        public IModel GetChannel()
        {
            if (Channel != null)
                return Channel;
            else
            {
                //times++;
               // Console.WriteLine($"CreateModel{times}次");
                return GetConnection().CreateModel();
            }
        }


    }

二、消息发布

1、获取连接、交换机和队列

 public class RabbitMQPublisher : IPublisher
    {
        static int x_message_ttl;
        static RabbitMQPublisher()
        {
            int.TryParse(ConfigurationHelper.GetKey("RabbitMQ_x-message-ttl"), out x_message_ttl);
            x_message_ttl = x_message_ttl * 60 * 1000;
        }
        #region
        private readonly RabbitMQProvider _provider;
        private IConnection _connection;

        public RabbitMQPublisher(RabbitMQProvider provider)
        {
            try
            {
                _provider = provider;
                //if (_connection == null || !_connection.IsOpen)
                //{
                //    _connection = _provider.ConnectionFactory.CreateConnection();
                //}
                _connection = _provider.GetConnection();
                _channel = _provider.GetChannel();

            }
            catch (Exception ex)
            {
                //记录异常日志
                Util.LogError($"RabbitMQPublisher createConnection exception. Exception message:{ex.Message}");
            }
        }

        public IConnection Connection
        {
            get
            {
                
                if (_connection != null)
                    return _connection;
                return _connection = _provider.GetConnection(); ;
            }
            //get; set;
        }

        private IModel _channel;
        public IModel Channel
        {
            get
            {
                if (_channel != null)
                    return _channel;
                else
                {
                    //if (_connection == null || !_connection.IsOpen)
                    //{
                    //    _connection = _provider.GetConnection(); ;
                    //}

                    return _channel = _provider.GetChannel();
                }

            }
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            if (Channel != null)
            {
                if (Channel.IsOpen)
                    Channel.Close();
                Channel.Abort();
                Channel.Dispose();
            }

            if (Connection != null)
            {
                if (Connection.IsOpen)
                    Connection.Close();
            }
        }
        #endregion
}

2、同步消息发送

 /// <summary>
        /// 发布(生产)消息
        /// </summary>
        /// <param name="message">消息内容</param>
        /// <param name="exchangeName">交换机名称</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="exchangeType">交换机类型</param>
        /// <param name="routingKey">路由键</param>
        /// <param name="durable">是否持久化</param>
        /// <param name="autoDelete">是否自动删除</param>
        /// <param name="arguments">用于插件和代理特定功能,如消息TTL、队列长度限制等</param>
        /// 1.x-message-ttl             发送到队列的消息在丢弃之前可以存活多长时间(毫秒)。
        /// 2.x-expires                 队列在被自动删除(毫秒)之前可以使用多长时间。
        /// 3.x-max-length              队列在开始从头部删除之前可以包含多少就绪消息。
        /// 4.x-max-length-bytes        队列在开始从头部删除之前可以包含的就绪消息的总体大小。
        /// 5.x-dead-letter-exchange    设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称。
        /// 6.x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥。
        /// 7.x-max-priority            队列支持的最大优先级数; 如果未设置,队列将不支持消息优先级。
        /// 8.x-queue-mode              将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息。
        /// 9.x-queue-master-locator    将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。
        private Task Publish(string message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
        {
            if (x_message_ttl > 0)
            {
                arguments = new Dictionary<string, object>();
                arguments.Add("x-message-ttl", x_message_ttl);
            }
            //声明交换机
            Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments);
            //声明队列
            Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments);
            Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey);
            var msgByte = Encoding.UTF8.GetBytes(message);
            //设置消息持久化
            var props = Channel.CreateBasicProperties();
            props.Persistent = true;
            try
            {
                Channel.TxSelect();
                Channel.BasicPublish
                (
                    exchange: exchangeName,
                    routingKey: routingKey == null ? string.Empty : routingKey,
                    mandatory: false,
                    basicProperties: props,
                    body: msgByte
                );
                Channel.TxCommit();
            }
            catch (Exception ex)
            {
                Channel.TxRollback();
                //记录异常日志
                Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}");
            }
            return Task.FromResult(0);
        }

3、批量发布

 /// <summary>
        /// 批量发布
        /// </summary>
        /// <param name="message"></param>
        /// <param name="exchangeName"></param>
        /// <param name="queueName"></param>
        /// <param name="exchangeType"></param>
        /// <param name="routingKey"></param>
        /// <param name="durable"></param>
        /// <param name="autoDelete"></param>
        /// <param name="arguments"></param>
        /// <returns></returns>
        private async Task PublishAsyncBatch(List<string> message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
        {
            //using (var conn = _provider.ConnectionFactory.CreateConnection())
            //{
            using (var channel = Connection.CreateModel())
            {
                ///Console.WriteLine(1);
                if (x_message_ttl > 0)
                {
                    arguments = new Dictionary<string, object>();
                    arguments.Add("x-message-ttl", x_message_ttl);
                }
                //声明交换机
                Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments);
                //声明队列
                Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments);
                Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey);

                //设置消息持久化
                var props = Channel.CreateBasicProperties();
                props.Persistent = true;
                try
                {
                    Channel.TxSelect();
                    var basicPublishBatch = Channel.CreateBasicPublishBatch();

                    byte[] msgByte;
                    ReadOnlyMemory<byte> memory;
                    foreach (var msg in message)
                    {
                        msgByte = Encoding.UTF8.GetBytes(msg);
                        memory = new ReadOnlyMemory<byte>(msgByte);
                        basicPublishBatch.Add
                        (
                            exchange: exchangeName,
                            routingKey: routingKey == null ? string.Empty : routingKey,
                            mandatory: false,
                            properties: props,
                            body: memory
                        );
                    }
                    basicPublishBatch.Publish();

                    Channel.TxCommit();

                    await Task.Yield();
                }
                catch (Exception ex)
                {

                    Channel.TxRollback();
                    channel.Close(); channel.Dispose();
                    //conn.Close(); conn.Dispose();
                    //记录异常日志
                    Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}");

                    Console.WriteLine("消息订阅时错误:" + ex.Message);
                }

            }
            
        }

注意:多线程消息发布时,应避免多个线程使用同一个IModel实例,必须保证Imodel被一个线程独享,如果必须要多个线程访问呢一个实例的话,则可以通过加锁来处理,详见:.NET/C# Client API Guide — RabbitMQ

IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
  ch.BasicPublish(...);
}

三、消息订阅

1、获取连接、交换机和队列同上消息发布,不再赘述

 private void QueueInitialization(string queueName, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
        {
            try
            {
                if (x_message_ttl > 0)
                {
                    arguments = new Dictionary<string, object>();
                    arguments.Add("x-message-ttl", x_message_ttl);
                }
                Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments);
            }
            catch (Exception )
            {

            }
        }
/// <summary>
        /// 
        /// </summary>
        /// <param name="queueName"></param>
        /// <param name="callback"></param>
        /// <param name="autoAck"></param>
        /// <param name="consumPerTimes">每次消费的消息条数</param>
        /// <returns></returns>
        private async Task SubscribeAsync(string queueName, Func<string, bool> callback, bool autoAck, ushort consumPerTimes = 1)
        {
            try
            {
                QueueInitialization(queueName);
                //声明为手动确认,每次只消费1条消息。
                Channel.BasicQos(0, consumPerTimes, false);
                //定义消费者
                //var consumer = new EventingBasicConsumer(Channel);

                var consumer = new AsyncEventingBasicConsumer(Channel);

                //接收事件
                consumer.Received += async (eventSender, args) =>
                {
                    var message = args.Body.ToArray();//接收到的消息

                    var res = callback(Encoding.UTF8.GetString(message));
                    //返回消息确认
                    Channel.BasicAck(args.DeliveryTag, res);
                    await Task.Yield();
                };

                //开启监听 -- gai2023-11-1
                Channel.BasicConsume(queueName, autoAck, consumer);
                // await Task.Delay(1000);
            }
            catch (Exception e)
            {
                Console.WriteLine("消息订阅时错误:" + e.Message);
            }
        }

四、通过workerService订阅处理消息

internal class SubscribeWorker : BackgroundService
    {
        #region override
        public override Task StartAsync(CancellationToken cancellationToken)
        {
            try
            {
                //一些数据初始化
                _logger.LogInformation($"Settings 初始化完成");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
            }


            return base.StartAsync(cancellationToken);
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //这里注意,不能写在while里,否则会一直进行重复订阅,会导致连接数一直增长
            await MainSubscribe();
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await Task.Delay(2000);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, ex.Message);
                }
                
            }
        }

        /// <summary>
        /// 服务停止
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public override Task StopAsync(CancellationToken cancellationToken)
        {
            Task.WaitAll();
            subscriber.Dispose();
            _logger.LogInformation("Worker stop at: {time}", DateTimeOffset.Now);
            return base.StopAsync(cancellationToken);
        }
        #endregion





    }

文章来源:https://blog.csdn.net/qq_34811513/article/details/134285767
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。