这篇文章上次修改于 338 天前,可能其部分内容已经发生变化,如有疑问可询问作者。

rabbitmq的四种exchange

  • 1、direct 直连交换机: 绑定队列的时候可以使用binding_key来作为消息的路由,binding_key是完全匹配的方式,当生产者发送消息的时候制定了具体的routing_key,消息会路由到绑定了相同的binding_key的队列中;如果绑定的所有队列都没有使用binding_key,并且生产者推送消息也没有指定routing_key,那么消息会被推送到所有绑定的队列中,和fanout交换机一样,然如果生产者推送消息指定了binding_key,由于匹配不到绑定响应key的队列,消息会被丢弃

  • 2、fanout 扇形交换机: 交换机不能使用routing_key,所有进入的消息都会被转发到与它绑定的所有队列中

  • 3、topic 主题交换机: 与direct 直连交换机类似,唯一不同就是routing_key支持使用通配符;
    通配符:

 * 表示一个单词
 # 表示任意数量(零个或多个)单词。

比如a,b 两个队列绑定是的binding_key 设置成log.*, 那么消息中有log.err, log.warn的routing_key的消息,都会被发送到a,b两个队列,但如果消息的routing_key是log.lvone.err 则匹配不到队列被丢弃

  • 4、headers 首部交换机: 此类型的交换机不使用binding_key绑定队列,而是在绑定的时候使用key:value形式的参数,消息发送的时候再header头中设置对应的key:value数据,如果匹配上就将消息发送到对应的队列中

说一下什么是channel,他的作用是什么

  • 信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的。信道是建立在TCP上面的虚拟链接,目的是为了减少tcp链接的创建,达到复用tcp链接的作用,减少系统的消耗

如何保证数据不丢失

  • 数据丢失的原因有下面三种情况:

    • 1、生产者发送数据的时候丢失数据,可能因为网络问题或其他问题等
    • 2、rabbitmq自己弄丢了数据,数据还没有持久化,进程挂了
    • 3、消费者弄丢了数据,消费者拿到了数据,消费中进程挂了,导致数据丢失
  • 生产者发送数据丢失解决方案:

    • 开启rabbitmq事务,(但不推荐使用此方案,因为此方案是同步的,会堵塞IO导致性能下降),就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
    // 开启事务
    channel.txSelect
    try {
      // 这里发送消息
    } catch (Exception e) {
      channel.txRollback
    // 这里再次重发这条消息
    
    }
    
    // 提交事务
    channel.txCommit
    
    
    • 使用confirm机制,confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

if err := channel.Confirm(false); err != nil { return fmt.Errorf("Channel could not be put into confirm mode: %s", err) } confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1)) if confirmed := <-confirms; confirmed.Ack { log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag) } else { log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag) }
  • rabbitmq本身弄丢数据解决方案:

    • 1、消息持久化:
      rabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。
      所以就要对消息进行持久化处理。如何持久化,下面具体说明下:
      要想做到消息持久化,必须满足以下三个条件,缺一不可。
      1)Exchange 设置持久化
      2)Queue 设置持久化
      3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

    • 2、设置集群镜像模式,把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

    • 3、补偿机制,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。
  • 消费者弄丢数据解决方案:

    • 关闭自动ack,采用消费者手动ack机制,即消费端消费完成要通知服务端,服务端才把消息从内存删除
    deliveries, err := c.channel.Consume(
        queue.Name, // name
        c.tag,      // consumerTag,
        false,      // noAck
        false,      // exclusive
        false,      // noLocal
        false,      // noWait
        nil,        // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Consume: %s", err)
    }

    go handle(deliveries, c.done)

func handle(deliveries <-chan amqp.Delivery, done chan error) {
    for d := range deliveries {
        log.Printf(
            "got %dB delivery: [%v] %q",
            len(d.Body),
            d.DeliveryTag,
            d.Body,
        )
        d.Ack(false)
    }
    log.Printf("handle: deliveries channel closed")
    done <- nil
}