RabbitMQ

AMQP

AMQP是一种使用广泛的独立于语言的消息协议,它的全称是Advanced Message Queuing Protocol,即高级消息队列协议,它定义了一种二进制格式的消息流,任何编程语言都可以实现该协议。实际应用最广泛的AMQP服务器是使用Erlang编写的RabbitMQ

在JMS中,有两种类型的消息通道:

  1. 点对点的Queue,即Producer发送消息到指定的Queue,接收方从Queue收取消息;

  2. 一对多的Topic,即Producer发送消息到指定的Topic,任意多个在线的接收方均可从Topic获得一份完整的消息副

但是AMQP协议比JMS要复杂一点,它只有Queue,没有Topic,并且引入了Exchange的概念。当Producer想要发送消息的时候,它将消息发送给Exchange,由Exchange将消息根据各种规则投递到一个或多个Queue:

1
2
3
4
5
6
7
8
9
10
11
                                      ┌───────┐
┌───>│Queue-1│
┌──────────┐ │ └───────┘
┌──>│Exchange-1│───┤
┌──────────┐ │ └──────────┘ │ ┌───────┐
│Producer-1│──┤ ├───>│Queue-2│
└──────────┘ │ ┌──────────┐ │ └───────┘
└──>│Exchange-2│───┤
└──────────┘ │ ┌───────┐
└───>│Queue-3│
└───────┘

如果某个Exchange总是把消息发送到固定的Queue,那么这个消息通道就相当于JMS的Queue。如果某个Exchange把消息发送到多个Queue,那么这个消息通道就相当于JMS的Topic。和JMS的Topic相比,Exchange的投递规则更灵活,比如一个“登录成功”的消息被投递到Queue-1和Queue-2,而“登录失败”的消息则被投递到Queue-3。这些路由规则称之为Binding,通常都在RabbitMQ的管理后台设置。

重要概念

AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。以下是AMQP协议中间的几个重要概念:

  • Server:接收客户端的连接,实现AMQP实体服务。
  • Connection:连接,应用程序与Server的网络连接,TCP连接。
  • Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
  • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
  • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
  • Queue:消息队列,用来保存消息,供消费者消费。

RabbitMQ概念

img

  1. Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
  2. Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  3. Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  4. Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  5. Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  6. Connection
    网络连接,比如一个TCP连接。
  7. Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  8. Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  9. Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 。
  10. Broker
    表示消息队列服务器进程的实体 ,包括Exchange 和 Queue。

RabbitMQ常用的交换器类型

有direct、topic、fanout、headers四种

Direct:

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发 “dog.puppy” ,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

img

fanout:

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

img

topic:(通配符模式)

Topic交换器按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配

* 符号:有且只匹配一个词。比如 a.*可以匹配到”a.b”、”a.c”,但是匹配不了”a.b.c”。

# 符号:匹配一个或多个词。比如”rabbit.#”既可以匹配到”rabbit.a.b”、”rabbit.a”,也可以匹配到”rabbit.a.b.c”。

通配符模式

img

RabbitMQ 集群

RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及通过添加更多的节点来线性扩展消息通信吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通信框架 OTP 来满足上述需求,使客户端在失去一个 RabbitMQ 节点连接的情况下,还是能够重新连接到集群中的任何其他节点继续生产、消费消息。

MQ三大功能:

1. 流量削峰:

一个图书系统,最多每秒能同时接受一万次访问,否则就宕机了。

解决:

通过MQ 消息队列,对访问的人员进行【排队】,达到削峰。

好处: 不会宕机 坏处:等待时间边长

2. 应用解耦:

原本的订单系统,它下面的子系统,付款系统,下单系统等系统,这种耦合的系统,只要其中一个系统,比如付款系统

出现故障,那么其他与它耦合的系统都会出现故障,导致其他系统不能正常运行。

订单系统和子系统之间有一个消息队列,订单系统这个系统完成某个需求以后需要转到付款系统执行付款,这个时候如果

订单系统发生故障,付款系统不会被影响。

因为订单系统只有完成了某个需求以后才会发消息给消息队列,然后队列就会分配消息给连接它的系统,直至连接它的所有系统完成任务。

3. 异步处理

Work Queues

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作线程时,任务将在它们之间共享。

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮循机制。尝试与三个或更多工人一起尝试。

消息应答

概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收 到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失 了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当 然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使 得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以 某种速率能够处理这些消息的情况下使用。

消息自动重新入队如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息

手动应答

常用API

1
2
3
4
5
6
7
8
9

channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息

channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);
Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列

channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);
nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示不是重回队列

RabbitMQ 持久化

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果 要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化

队列持久化

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果 要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化

1
2
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

消息持久化

1
2
3
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN, //我们需要将消息标记为持久性 ,通过将 MessageProperties(实现 BasicProperties)设置为值 PERSISTENT_TEXT_PLAIN。
message.getBytes());

公平调度

您可能已经注意到,调度仍然不完全符合我们的要求。例如,在有两个 worker 的情况下,当所有奇怪的消息都很重,甚至消息都很轻时,一个 worker 会一直很忙,而另一个 worker 几乎不会做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地调度消息。

img

发生这种情况是因为 RabbitMQ 只是在消息进入队列时调度消息。它不查看使用者的未确认消息的数量。它只是盲目地将第 n 条消息分派给第 n 个使用者。

In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don’t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

1
2
int prefetchCount = 1;
channel.basicQos(prefetchCount);

发布确认原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消 息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会 发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了, 如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产 者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。 confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道 返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方 法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它 被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认 的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。 这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。

批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地 提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布

异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的

ExChange 交换机

在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把 严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。 我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解: 队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key, 创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);绑定之后的 意义由其交换类型决定。

Fanout

这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。它是将接收到的所有消息广播到它知道的 所有队列中

image-20220720171454496

Direct

绑定是交换机和队列之间的桥梁关系。也可以这么理解: 队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key, 创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);绑定之后的 意义由其交换类型决定

image-20220720171706687

Topic

尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候 就只能使用 topic 类型

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。 在这个规则列表中,其中有两个替换符是大家需要注意的 *(星号)可以代替一个单词 #(井号)可以替代零个或多个单词

在这个规则列表中,其中有两个替换符是大家需要注意的

  • * (星号)可以代替一个单词
  • # (井号)可以替代零个或多个单词

下图绑定关系如下

Q1–>绑定的是:中间带 orange 带 3 个单词的字符串*.orange.*

Q2–>绑定的是: 最后一个单词是 rabbit 的 3 个单词*.*.rabbit ,第一个单词是 lazy 的多个单词lazy.#

image-20220720172109655

死信

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效

那么什么情况下会成为Dead message

  1. 队列的长度达到阈值。
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
  3. 原队列存在消息过期设置,消息到达超时时间未被消费。

img

延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列

使用场景

  • 1.订单在十分钟之内未支付则自动取消
  • 2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎 使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这 条消息如果在TTL 设置的时间内没有被消费,则会成为”死信”。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用

有两种方式设置 TTL。

  • 第一种是在创建队列的时候设置队列的“x-message-ttl”属性
  • 另一种方式便是针对每条消息设置TTL

两者的区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队 列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者 之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需 要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以 直接投递该消息到消费者,否则该消息将会被丢弃。

RabbitMQ实现延时消息的两种方法

  • 死信队列
  • 延时插件

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

回退消息

备份交换机

通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息 无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然 后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者 所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增 加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的 复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些 处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。 在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份 交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就 是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备 份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定 的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

image-20220721224939286

RabbitMQ 其他知识点

幂等性

幂等性的实质是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复相同的请求而对该资源重复造成影响。注意关注的是请求操作对资源本身造成的影响

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费 者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消 息时用该 id 先判断该消息是否已消费过。

① 一次或多次请求,对资源均不会造成影响,比如select操作;

② 第一次请求对资源产生了影响,后面再发出多个相同的请求,与发出单个请求具有相同的效果,不能重复对资源产生影响。比如支付宝转账,手抖重复提交了2次,第一次扣款成功,余额减少100元,第二次就不能再重复扣款了。

③ 需要说明的是网络超时、服务宕机等问题,不是幂等的范围。

幂等性是系统服务对外的一种承诺。比如我写了一个接口,我承诺我的接口是符合幂等性的,就是说外部的多次调用对我系统造成的影响都是相同的,不会因为多次调用而对系统重复造成影响。这里需要说明的一点是,声明为幂等的服务会认为调用方调用失败是常态,并且允许在调用失败后重试。

我们先来看看在RabbitMQ中,哪些情况可能导致非幂等?

① consumer接收到消息处理完成后,在给Broker返回ack途中网络中断,Broker未收到确认信息,根据RabbitMQ的重试补偿机制,则会把这条消息再重发给其他的消费者或等网络重连后再发送给该消费者,造成了消息的重复消费。

② 或者在开启生产者confirm模式下,生产者已经把消息发送到Broker,但在Broker回传ack确认时网络中断,生产者也会重新发送刚才的消息,造成Broker收到了重复的消息,最终将两条重复的消息发送到消费端,造成了消息的重复消费。

幂等性保障 (避免重复消费)

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性, 这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:

  • a. 唯一 ID+指纹码机制,利用数据库主键去重,
  • b.利用 redis 的原子性去实现

唯一ID+数据库

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

唯一ID + Redis

利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费

优先级队列

要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序

惰性队列

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消 费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持 更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致 使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。 默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的 时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。

RabbitMQ 集群

  • 单一模式。

  • 普通模式(默认的集群模式)。意思就是在多台机器上启动多个rabbitmq实例每个机器启动一个。但是你创建的queue,只会放在一个rabbtimq实例上,但是**每个实例都同步queue的元数据(存放含queue数据的真正实例位置)**。消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。

  • 镜像模式:RabbitMQ默认集群模式,并不包管队列的高可用性,尽管队列信息,交换机、绑定这些可以复制到集群里的任何一个节点,然则队列内容不会复制,固然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能守候重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,须要创建镜像队列。把需要的队列做成镜像队列,存在于多个节点,属于RabbiMQ的HA方案,在对业务可靠性要求较高的场合中比较适用)。 要实现镜像模式,需要先搭建一个普通集群模式,在这个模式的基础上再配置镜像模式以实现高可用。

使用集群的原因:最开始我们介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的 要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ 服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞 吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是 解决实际问题的关键

参考: https://www.jianshu.com/p/dae5bbed39b1

https://www.jianshu.com/p/79ca08116d57

https://mp.weixin.qq.com/s/adse6qpIiK0RE-Ebo-z5_Q

https://liuyueyi.github.io/hexblog/categories/%E5%BC%80%E6%BA%90/RabbitMQ/

保证rabbitMq顺序消费

乱序消费的根源是多个消费者都消费了同一个队列的消息。面对这个问题我们可以:

将一个queue拆分多个queue,每个queue一个consumer。生产者发送消息的时候,同一个类型的消息发送到同一个 queue 中。由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。

在这里插入图片描述

如图所示,RabbitMQ保证消息的顺序性,就是拆分多个 queue,每个 queue 对应一个 consumer(消费者),就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

如何保证消息不丢失?

  • 生产者发送消息至MQ的数据丢失

解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了

  • MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中

  • 消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

解决方式:RabbitMQ 给我们提供了消费者应答(ack)机制,默认情况下这个机制是自动应答,只要消息推送到消费者就会自动 ack ,然后 RabbitMQ 删除队列中的消息。启用手动应答之后我们在消费端调用 API 手动 ack 确认之后,RabbitMQ 才会从队列删除这条消息。

消息(堆积)积压

所谓消息积压一般是由于消费端消费的速度远小于生产者发消息的速度,导致大量消息在 RabbitMQ 的队列中无法消费。

  • 对生产者发消息接口进行适当限流(不太推荐,影响用户体验)
  • 多部署几台消费者实例(推荐)
  • 适当增加 prefetch 的数量,让消费端一次多接受一些消息(推荐,可以和第二种方案一起用)

kafka

(239条消息) Kafka Consumer位移(Offset)提交——解决Consumer重复消费和消息丢失问题_warybee的博客-CSDN博客_kafka手动提交offset 重复消费

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式流处理框架,用于实时构建流处理应用。它有一个核心 的功能广为人知,即作为一款开源的基于发布订阅模式的消息引擎系统

Kafka 的特性(设计原则)

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性:以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发:支持数千个客户端同时读写

Kafka 的使用场景

  • 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
  • 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
  • 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

Kafka 的消息队列一般分为两种模式:点对点模式和发布订阅模式

img

一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

核心 API

Kafka 有四个核心API,它们分别是

  • Producer API,它允许应用程序向一个或多个 topics 上发送消息记录
  • Consumer API,允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
  • Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
  • Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改

Kafka 为何如此之快

Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据。批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费

  • 顺序读写
  • 零拷贝
  • 消息压缩
  • 分批发送

Kafka 存储在文件系统上

是的,您首先应该知道 Kafka 的消息是存在于文件系统之上的。Kafka 高度依赖文件系统来存储和缓存消息,一般的人认为 “磁盘是缓慢的”,所以对这样的设计持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以使之跟网络速度一样快。

现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读会提前将一个比较大的磁盘快读入内存。后写会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存,所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接 I/O 会绕过磁盘缓存)。综合这几点优化特点,如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几。

在 Kafka 中,ZooKeeper 的作用是什么?

目前,Kafka 使用 ZooKeeper 存放集群元数据、成员管理、Controller 选举,以及其他一些管理类任务。之后,等 KIP-500 提案完成后,Kafka 将完全不再依赖 于 ZooKeeper。KIP-500 思想,是使用社区自研的基于 Raft 的共识算法, 替代 ZooKeeper,实现 Controller 自选举

存放元数据”是指主题 分区的所有数据都保存在 ZooKeeper 中,且以它保存的数据为权威,其他“人”都要与它 保持对齐。“成员管理”是指 Broker 节点的注册、注销以及属性变更,等 等。“Controller 选举”是指选举集群 Controller,而其他管理类任务包括但不限于主题 删除、参数配置等。

什么是消费者组?

消费者组是 Kafka 独有的概念,是 Kafka 提供的可扩展且具有容错性的消费者机制。
原理:在 Kafka 中,消费者组是一个由多个消费者实例 构成的组。多个实例共同订阅若干个主题,实现共同消费。同一个组下的每个实例都配置有 相同的组 ID,被分配不同的订阅分区。当某个实例挂掉的时候,其他实例会自动地承担起 它负责消费的分区。

  • 消费者组的位移提交机制;
  • 消费者组与 Broker 之间的交互;

Kafka 中位移(offset)的作用

在 Kafka 中,每个 主题分区下的每条消息都被赋予了一个唯一的 ID 数值,用于标识它在分区中的位置。这个 ID 数值,就被称为位移,或者叫偏移量。一旦消息被写入到分区日志,它的位移值将不能 被修改。

Kafka 是 pull 模型

消费者应该向 Broker 要数据(pull)还是 Broker 向消费者推送数据(push)?作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息。一些 logging-centric system,比如 Facebook 的Scribe和 Cloudera 的Flume,采用 push 模式。事实上,push 模式和 pull 模式各有优劣。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适。pull 模式可简化 broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

ElasticSearch

说说ElasticSearch put的全过程

put过程主要分为三个阶段:

  1. 协调阶段:

    Client 客户端选择一个 node 发送 put 请求,此时当前节点就是协调节点(coordinating node)。协调节点根据 document 的 id 进行路由,将请求转发给对应的 node。这个 node 上的是 primary shard 。

  2. 主要阶段:

    对应的 primary shard 处理请求,写入数据 ,然后将数据同步到 replica shard。

    • primary shard 会验证传入的数据结构;
    • 本地执行相关操作;
    • 将操作转发给 replica shard。

    当数据写入 primary shard 和 replica shard 成功后,路由节点返回响应给 Client。

  3. 副本阶段:

    每个 replica shard 在转发后,会进行本地操作。

在写操作时,默认情况下,只需要 primary shard 处于活跃状态即可进行操作。在索引设置时可以设置这个属性:index.write.wait_for_active_shards。默认是 1,即 primary shard 写入成功即可返回。 如果设置为 all 则相当于 number_of_replicas+1 就是 primary shard 数量 + replica shard 数量。就是需要等待 primary shard 和 replica shard 都写入成功才算成功。可以通过索引设置动态覆盖此默认设置。

ElasticSearch的倒排索引

参考答案

Elasticsearch 使用一种称为倒排索引的结构,它适用于快速的全文搜索。一个倒排索引由文档中所有不重复词的列表构成,对于其中每个词,有一个包含它的文档列表。

例如,假设我们有两个文档,每个文档的 content 域包含如下内容:

  1. The quick brown fox jumped over the lazy dog
  2. Quick brown foxes leap over lazy dogs in summer

为了创建倒排索引,我们首先将每个文档的 content 域拆分成单独的 词(我们称它为 词条 或 tokens ),创建一个包含所有不重复词条的排序列表,然后列出每个词条出现在哪个文档。结果如下所示:

img

现在,如果我们想搜索 quick brown ,我们只需要查找包含每个词条的文档:

img

两个文档都匹配,但是第一个文档比第二个匹配度更高。如果我们使用仅计算匹配词条数量的简单相似性算法 ,那么,我们可以说,对于我们查询的相关性来讲,第一个文档比第二个文档更佳。

但是,我们目前的倒排索引有一些问题:

  • Quick 和 quick 以独立的词条出现,然而用户可能认为它们是相同的词。
  • fox 和 foxes 非常相似, 就像 dog 和 dogs ;他们有相同的词根。
  • jumped 和 leap, 尽管没有相同的词根,但他们的意思很相近。他们是同义词。

使用前面的索引搜索 +Quick +fox 不会得到任何匹配文档。(记住,+ 前缀表明这个词必须存在。)只有同时出现 Quick 和 fox 的文档才满足这个查询条件,但是第一个文档包含 quick fox ,第二个文档包含 Quick foxes 。

我们的用户可以合理的期望两个文档与查询匹配。我们可以做的更好。如果我们将词条规范为标准模式,那么我们可以找到与用户搜索的词条不完全一致,但具有足够相关性的文档。例如:

  • Quick 可以小写化为 quick 。
  • foxes 可以 词干提取 –变为词根的格式– 为 fox 。类似的, dogs 可以为提取为 dog 。
  • jumped 和 leap 是同义词,可以索引为相同的单词 jump 。

现在索引看上去像这样:

img

这还远远不够。我们搜索 +Quick +fox 仍然 会失败,因为在我们的索引中,已经没有 Quick 了。但是,如果我们对搜索的字符串使用与 content 域相同的标准化规则,会变成查询 +quick +fox ,这样两个文档都会匹配!