RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
所谓RabbitMQ的路由模式(Routing),就是有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息
Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收消息,路由模式使用的Exchange类型为Direct类型
举个例子:如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息
下面,我们用C#代码来具体实现一下,以下逻辑需要NuGet里的RabbitMQ.Client包支持,首先安装RabbitMQ.Client包
接下来,我们声明一个类库,用来存放消息对象ReceiverMsgData,为了通用,我特地申明了类型,操作,具体对象实体等内容,以下属性仅供参考,安装自己的具体业务需求设计即可
/// <summary>
/// 消息内容
/// </summary>
public class ReceiverMsg
{
/// <summary>
/// 消息类型
/// 10001->定时任务(队列:calculation_remind)
/// 23001->订单下单分佣计算(队列:calculation_orderaward)
/// 23005->订单支付成功计算报表(队列:calculation_orderoperate)
/// 23006->订单售后成功计算报表(队列:calculation_orderoperate)
/// </summary>
[Required(ErrorMessage = "{0} 必须填写")]
public int msg_type { get; set; }
/// <summary>
/// 操作人
/// </summary>
[Required(ErrorMessage = "{0} 必须填写")]
public long creator_id { get; set; }
/// <summary>
/// 单据id
/// </summary>
[Required(ErrorMessage = "{0} 必须填写")]
public int source_id { get; set; }
/// <summary>
/// 单据来源,定时任务专用(1订单)
/// </summary>
public int source_type { get; set; }
/// <summary>
/// 单据操作,定时任务专用 (1添加,2发货,3签收)
/// </summary>
public int action_type { get; set; }
/// <summary>
/// 定时任务执行的时间(无需填写,定时任务自动生成)
/// </summary>
public DateTime next_remind_time { get; set; }
/// <summary>
/// 错误消息系统编码(无需填写,自动重试用)
/// </summary>
public int error_msg_id { get; set; }
/// <summary>
/// 错误消息所在的队列名称(无需填写,自动重试用)
/// </summary>
public string error_msg_domain { get; set; }
/// <summary>
/// 消息附加内容
/// </summary>
public object msg_body { get; set; }
}
/// <summary>
/// 消息对象
/// UserName:admin
/// Password:Myun@123jx
/// VirtualHost:Publish
/// Port:5672
/// ExchangeName:calculation
/// </summary>
[Serializable]
public class ReceiverMsgData
{
/// <summary>
/// VirtualHost
/// </summary>
[Required(ErrorMessage = "{0} 必须填写")]
public string pattern { get; set; }
/// <summary>
/// Msgid
/// </summary>
public string id { get; set; }
/// <summary>
/// 消息内容
/// </summary>
[Required(ErrorMessage = "{0} 必须填写")]
public ReceiverMsg data { get; set; }
}
然后,我们生命一个生产者,用来产生消息
public class RabbitMQHelp
{
public static IConfigurationRoot configuration = new ConfigurationBuilder().SetBasePath(Directory.GetCurrentDirectory()).AddJsonFile("appsettings.json").Build();
private static string HostName = EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["HostName"]); //RabbitMQ服务IP地址
private static string UserName = EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["UserName"]); //RabbitMQ用户名
private static string Password = EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["Password"]); //RabbitMQ密码
private static int Port = Convert.ToInt32(EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["Port"]));//RabbitMQ端口 默认5672
private static string VirtualHost = EncryptHelp.Decrypt(configuration.GetSection("RabbitMQ")["VirtualHost"]);//Virtual Hosts名称
private static string ExchangeName = "calculation";//交换机名称
/// <summary>
/// 连接配置
/// </summary>
private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
{
HostName = HostName,
UserName = UserName,
Password = Password,
Port = Port,
VirtualHost = VirtualHost
};
/// <summary>
/// 单点精确路由模式,重发消息
/// </summary>
public static void DirectExchangeSendMsg(string QueueName, string msgBody)
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
var props = channel.CreateBasicProperties();
props.Persistent = true; //消息永久化硬盘上,打开会降低些许性能,但是可以防止服务器意外断电导致消息队列数据消失的情况
var BytmsgBody = Encoding.UTF8.GetBytes(msgBody);
channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: BytmsgBody);
}
}
}
}
最后,我们声明一个消费者,用来接收消息
/// <summary>
/// 消费者
/// </summary>
/// <param name="QueueName"></param>
public static void ReceiveWeatherInfo(string QueueName)
{
using (var connection = rabbitMqFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"队列{QueueName}收到的气象信息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: QueueName,autoAck: false,consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
下面,我们演示一下消息发送
//自动处理售后消息
public static void doDealReturnOrder(int Return_Id)
{
RabbitMQHelp.DirectExchangeSendMsg("calculation_orderoperate", JsonConvert.SerializeObject(new ReceiverMsgData
{
id = Guid.NewGuid().ToString(),
pattern = "Publish",
data = new ReceiverMsg()
{
source_id = Return_Id,
msg_type = 23019,
}
}));//重发消息
}
最后,只要由控制台的Main函数指定队列名称,调度这个消费者方法,就能接收指定队列的消息了,注意我这里,应为发送的消息是序列化的JSON,所以,接收消息需要反序列化一下