C#路由方式调用RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

所谓RabbitMQ的路由模式(Routing),就是有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息

Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收消息,路由模式使用的Exchange类型为Direct类型

举个例子:如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息

下面,我们用C#代码来具体实现一下,以下逻辑需要NuGet里的RabbitMQ.Client包支持,首先安装RabbitMQ.Client包

接下来,我们声明一个类库,用来存放消息对象ReceiverMsgData,为了通用,我特地申明了类型,操作,具体对象实体等内容,以下属性仅供参考,安装自己的具体业务需求设计即可

/// <summary>
/// 消息内容
/// </summary>
public class ReceiverMsg
{
    /// <summary>
    /// 消息类型
    /// 10001->定时任务(队列:calculation_remind)
    /// 23001->订单下单分佣计算(队列:calculation_orderaward)
    /// 23005->订单支付成功计算报表(队列:calculation_orderoperate)
    /// 23006->订单售后成功计算报表(队列:calculation_orderoperate)
    /// </summary>
    [Required(ErrorMessage = "{0} 必须填写")]
    public int msg_type { get; set; }

    /// <summary>
    /// 操作人
    /// </summary>
    [Required(ErrorMessage = "{0} 必须填写")]
    public long creator_id { get; set; }

    /// <summary>
    /// 单据id
    /// </summary>
    [Required(ErrorMessage = "{0} 必须填写")]
    public int source_id { get; set; }

    /// <summary>
    /// 单据来源,定时任务专用(1订单)
    /// </summary>
    public int source_type { get; set; }

    /// <summary>
    /// 单据操作,定时任务专用 (1添加,2发货,3签收)
    /// </summary>
    public int action_type { get; set; }

    /// <summary>
    /// 定时任务执行的时间(无需填写,定时任务自动生成)
    /// </summary>
    public DateTime next_remind_time { get; set; }

    /// <summary>
    /// 错误消息系统编码(无需填写,自动重试用)
    /// </summary>
    public int error_msg_id { get; set; }

    /// <summary>
    /// 错误消息所在的队列名称(无需填写,自动重试用)
    /// </summary>
    public string error_msg_domain { get; set; }

    /// <summary>
    /// 消息附加内容
    /// </summary>
    public object msg_body { get; set; }
}

/// <summary>
/// 消息对象
/// UserName:admin
/// Password:Myun@123jx
/// VirtualHost:Publish
/// Port:5672
/// ExchangeName:calculation
/// </summary>
[Serializable]
public class ReceiverMsgData
{
    /// <summary>
    /// VirtualHost
    /// </summary>
    [Required(ErrorMessage = "{0} 必须填写")]
    public string pattern { get; set; }

    /// <summary>
    /// Msgid
    /// </summary>
    public string id { get; set; }

    /// <summary>
    /// 消息内容
    /// </summary>
    [Required(ErrorMessage = "{0} 必须填写")]
    public ReceiverMsg data { get; set; }
}

然后,我们生命一个生产者,用来产生消息

public class RabbitMQHelp
{
    public static IConfigurationRoot configuration = new ConfigurationBuilder().SetBasePath(Directory.GetCurrentDirectory()).AddJsonFile("appsettings.json").Build();
    private static string HostName = EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["HostName"]);  //RabbitMQ服务IP地址
    private static string UserName = EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["UserName"]);  //RabbitMQ用户名
    private static string Password = EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["Password"]);  //RabbitMQ密码
    private static int Port = Convert.ToInt32(EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["Port"]));//RabbitMQ端口 默认5672
    private static string VirtualHost = EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["VirtualHost"]);//Virtual Hosts名称
    private static string ExchangeName = "calculation";//交换机名称

    /// <summary>
    /// 连接配置
    /// </summary>
    private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
    {
        HostName = HostName,
        UserName = UserName,
        Password = Password,
        Port = Port,
        VirtualHost = VirtualHost
    };

    /// <summary>
    /// 单点精确路由模式,重发消息
    /// </summary>
    public static void DirectExchangeSendMsg(string QueueName, string msgBody)
    {
        using (IConnection conn = rabbitMqFactory.CreateConnection())
        {
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(ExchangeName, ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
                channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);

                var props = channel.CreateBasicProperties();
                props.Persistent = true; //消息永久化硬盘上,打开会降低些许性能,但是可以防止服务器意外断电导致消息队列数据消失的情况
                var BytmsgBody = Encoding.UTF8.GetBytes(msgBody);
                channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: BytmsgBody);
            }
        }
    }
}

最后,我们声明一个消费者,用来接收消息

/// <summary>
/// 消费者
/// </summary>
/// <param name="QueueName"></param>
public static void ReceiveWeatherInfo(string QueueName)
{
    using (var connection = rabbitMqFactory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(ExchangeName, ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
            channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
            channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"队列{QueueName}收到的气象信息:{message}");
                channel.BasicAck(ea.DeliveryTag, false);
            };
            channel.BasicConsume(queue: QueueName,autoAck: false,consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

下面,我们演示一下消息发送

//自动处理售后消息
public static void doDealReturnOrder(int Return_Id)
{
    RabbitMQHelp.DirectExchangeSendMsg("calculation_orderoperate", JsonConvert.SerializeObject(new ReceiverMsgData
    {
        id = Guid.NewGuid().ToString(),
        pattern = "Publish",
        data = new ReceiverMsg()
        {
            source_id = Return_Id,
            msg_type = 23019,
        }
    }));//重发消息
}

最后,只要由控制台的Main函数指定队列名称,调度这个消费者方法,就能接收指定队列的消息了,注意我这里,应为发送的消息是序列化的JSON,所以,接收消息需要反序列化一下