这篇文章上次修改于 338 天前,可能其部分内容已经发生变化,如有疑问可询问作者。
rabbitmq的四种exchange
1、direct 直连交换机: 绑定队列的时候可以使用binding_key来作为消息的路由,binding_key是完全匹配的方式,当生产者发送消息的时候制定了具体的routing_key,消息会路由到绑定了相同的binding_key的队列中;如果绑定的所有队列都没有使用binding_key,并且生产者推送消息也没有指定routing_key,那么消息会被推送到所有绑定的队列中,和fanout交换机一样,然如果生产者推送消息指定了binding_key,由于匹配不到绑定响应key的队列,消息会被丢弃
2、fanout 扇形交换机: 交换机不能使用routing_key,所有进入的消息都会被转发到与它绑定的所有队列中
3、topic 主题交换机: 与direct 直连交换机类似,唯一不同就是routing_key支持使用通配符;
通配符:
* 表示一个单词
# 表示任意数量(零个或多个)单词。
比如a,b 两个队列绑定是的binding_key 设置成log.*, 那么消息中有log.err, log.warn的routing_key的消息,都会被发送到a,b两个队列,但如果消息的routing_key是log.lvone.err 则匹配不到队列被丢弃
- 4、headers 首部交换机: 此类型的交换机不使用binding_key绑定队列,而是在绑定的时候使用key:value形式的参数,消息发送的时候再header头中设置对应的key:value数据,如果匹配上就将消息发送到对应的队列中
说一下什么是channel,他的作用是什么
- 信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的。信道是建立在TCP上面的虚拟链接,目的是为了减少tcp链接的创建,达到复用tcp链接的作用,减少系统的消耗
如何保证数据不丢失
数据丢失的原因有下面三种情况:
- 1、生产者发送数据的时候丢失数据,可能因为网络问题或其他问题等
- 2、rabbitmq自己弄丢了数据,数据还没有持久化,进程挂了
- 3、消费者弄丢了数据,消费者拿到了数据,消费中进程挂了,导致数据丢失
生产者发送数据丢失解决方案:
- 开启rabbitmq事务,(但不推荐使用此方案,因为此方案是同步的,会堵塞IO导致性能下降),就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
// 开启事务 channel.txSelect try { // 这里发送消息 } catch (Exception e) { channel.txRollback // 这里再次重发这条消息 } // 提交事务 channel.txCommit
- 使用confirm机制,confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。
if err := channel.Confirm(false); err != nil {
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))
if confirmed := <-confirms; confirmed.Ack {
log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
rabbitmq本身弄丢数据解决方案:
1、消息持久化:
rabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。
所以就要对消息进行持久化处理。如何持久化,下面具体说明下:
要想做到消息持久化,必须满足以下三个条件,缺一不可。
1)Exchange 设置持久化
2)Queue 设置持久化
3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息2、设置集群镜像模式,把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案
- 3、补偿机制,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。
消费者弄丢数据解决方案:
- 关闭自动ack,采用消费者手动ack机制,即消费端消费完成要通知服务端,服务端才把消息从内存删除
deliveries, err := c.channel.Consume(
queue.Name, // name
c.tag, // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Consume: %s", err)
}
go handle(deliveries, c.done)
func handle(deliveries <-chan amqp.Delivery, done chan error) {
for d := range deliveries {
log.Printf(
"got %dB delivery: [%v] %q",
len(d.Body),
d.DeliveryTag,
d.Body,
)
d.Ack(false)
}
log.Printf("handle: deliveries channel closed")
done <- nil
}
没有评论
博主关闭了评论...