當前位置: 華文世界 > 科技

一篇文搞定訊息佇列選型

2024-09-07科技

01

概述

訊息佇列是分布式系統中重要的中介軟體,在高效能、高可用、低耦合等系統架構中扮演著重要作用。分布式系統可以借助訊息佇列的能力,輕松實作以下功能:

  • 解耦:將一個流程的上下遊拆解開,上遊專註於生產訊息,下遊專註於處理訊息;
  • 廣播:上遊生產的訊息可以輕松被多個下遊服務處理;
  • 緩沖:應對突發流量,訊息佇列扮演緩衝區的作用,保護下遊服務,使其可以根據自身的實際消費能力處理訊息;
  • 異步:上遊發送訊息後可以馬上返回,下遊可以異步處理訊息;
  • 冗余:保留歷史訊息,處理失敗或當出現異常時可以進行重試或者回溯,防止遺失;
  • 02

    架構簡介

    2.1 Kafka

    2.1.1 系統框架

    一個 Kafka 集群由多個 Broker 和一個 ZooKeeper 集群組成,Broker 作為 Kafka 節點的伺服器。同一個訊息主題 Topic 可以由多個分區 Partition 組成,分區物理儲存在 Broker 上。負載均衡考慮,同一個 Topic 的多個分區儲存在多個不同的 Broker 上,為了提高可靠性,每個分區在不同的 Broker 會存在副本。

    ZookKeeper 是一個分布式開源的應用程式協調服務,可以實作統一命名服務、狀態同步服務、集群管理、分布式套用配置項的管理等工作。Kafka 裏的 ZooKeeper 主要有一下幾個作用:

  • Broker 註冊,當有 Broker 故障的時候能及時感知。
  • Topic 註冊,維護 Topic 各分區的個副本所在的 Broker 節點,以及對應 leader/follower 的角色。
  • Consumer 註冊,維護消費者組的 offset 以及消費者與分區的對應關系,實作負載均衡。
  • 2.1.2 基本術語

    Producer: 訊息生產者。一般情況下,一條訊息會被發送到特定的主題上。通常情況下,寫入的訊息會透過輪詢將訊息寫入各分區。生產者也可以透過設定訊息 key 值將訊息寫入指定分區。寫入分區的數據越均勻 Kafka 的效能才能更好發揮。

    Topic: Topic 是個抽象的虛擬概念,一個集群可以有多個 Topic,作為一類訊息的標識。一個生產者將訊息發送到 topic,消費者透過訂閱 Topic 獲取分區訊息。

    Partition: Partition 是個物理概念,一個 Topic 對應一個或多個 Partition。新訊息會以追加的方式寫入分區裏,在同一個 Partition 裏訊息是有序的。Kafka 透過分區,實作訊息的冗余和伸縮性,以及支持物理上的並行讀、寫,大大提高了吞吐量。

    Replicas: 一個 Partition 有多個 Replicas 副本。這些副本保存在 broker,每個 broker 儲存著成百上千個不同主題和分區的副本,儲存的內容分為兩種:master 副本,每個 Partition 都有一個 master 副本,所有內容的寫入和消費都會經過 master 副本;follower 副本不處理任何客戶端的請求,只同步 master 的內容進行復制。如果 master 發生了異常,很快會有一個 follower 成為新的 master。

    Consumer: 訊息讀取者。消費者訂閱主題,並按照一定順序讀取訊息。Kafka 保證每個分區只能被一個消費者使用。

    Offset: 偏移量是一種後設資料,是不斷遞增的整數。在訊息寫入時 Kafka 會把它添加到訊息裏。在分區內偏移量是唯一的。消費過程中,會將最後讀取的偏移量儲存在 Kafka 中,消費者關閉偏移量不會遺失,重新開機會繼續從上次位置開始消費。

    Broker: 獨立的 Kafka 伺服器。一個 Topic 有 N 個 Partition,一個集群有 N 個 Broker,那麽每個 Broker 都會儲存一個這個 Topic 的 Partition。如果某 topic 有 N 個 partition,集群有(N+M)個 broker,那麽其中有 N 個 broker 儲存該 topic 的一個 partition,剩下的 M 個 broker 不儲存該 topic 的 partition 數據。如果某 topic 有 N 個 partition,集群中 broker 數目少於 N 個,那麽一個 broker 儲存該 topic 的一個或多個 partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致 Kafka 集群數據不均衡。

    2.2 Pulsar

    2.2.1 系統框架

    Pulsar 有三個重要的元件,Broker、BookKeeper 和ZooKeeper,Broker 是無狀態服務,客戶端需要連線到 Broker 上進行訊息的傳遞。BookKeeper 與 ZooKeeper 是有狀態服務。BookKeeper 的節點叫 Bookie,負責儲存訊息和遊標,ZooKeeper 儲存 Broker 和 Bookie 的後設資料。Pulsar 以這種架構,實作儲存和計算分離,Broker 負責計算,Bookie 負責有狀態儲存。

    Pulsar 的多層架構影響了儲存數據的方式。Pulsar 將 Topic 分區劃分為分片(Segment),然後將這些分片儲存在 Apache BookKeeper 的儲存節點上,以提高效能、可伸縮性和可用性。Pulsar 的分布式日誌以分片為中心,借助擴充套件日誌儲存(透過 Apache BookKeeper)實作,內建分層儲存支持,因此分片可以均勻地分布在儲存節點上。由於與任一給定 Topic 相關的數據都不會與特定儲存節點進行捆綁,因此很容易替換儲存節點或縮擴容。另外,集群中最小或最慢的節點也不會成為儲存或頻寬的短板。

    2.2.2 基本術語

    Property: 代表租戶,每個 property 都可以代表一個團隊、一個功能、一個產品線。一個 property 可包含多個 namesapce,多租戶是一種資源隔離手段,可以提高資源利用率;

    Namespace: Pulsar 的基本管理單元,在 namaspace 級別可設定許可權、訊息 TTL、Retention 策略等。一個 namaspace 裏的所有 topic 都繼承相同的設定。名稱空間分為兩種:本地名稱空間,只在集群內可見、全域名稱空間對多個集群可見集群名稱空間;

    Producer: 數據生產方,負責建立訊息並將訊息投遞到 Pulsar 中;

    Consumer: 數據消費方,連線到 Pulsar 接收訊息並進行相應的處理;

    Broker: 無狀態 Proxy 服務,負責接收訊息、傳遞訊息、集群負載均衡等操作,它對 client 遮蔽了伺服端讀寫流程的復雜性,是保證數據一致性與數據負載均衡的重要角色。Broker 不會持久化保存後設資料。可以擴容但不能縮容;

    BookKeeper: 有狀態,負責持久化儲存訊息。當集群擴容時,Pulsar 會在新增 BookKeeper 和 Segment(即 Bookeeper 的 Ledger),不需要像 kafka 一樣在擴容時進行 Rebalance。擴容結果是 Fragments 跨多個 Bookies 以帶狀分布,同一個 Ledger 的 Fragments 分布在多個 Bookie 上,導致讀取和寫入會在多個 Bookies 之間跳躍;

    ZooKeeper: 儲存 Pulsar 、 BookKeeper 的後設資料,集群配置等資訊,負責集群間的協調、服務發現等;

    Topic: 用作從 producer 到 consumer 傳輸訊息。Pulsar 在 Topic 級別擁有一個 leader Broker,稱之為擁有 Topic 的所有權,針對該 Topic 所有的 R/W 都經過該 Broker 完成。Topic 的 Ledger 和 Fragment 之間對映關系等後設資料儲存在 Zookeeper 中,Pulsar Broker 需要即時跟蹤這些關系進行讀寫流程;

    Ledger: 即 Segment,Pulsar 底層數據以 Ledger 的形式儲存在 BookKeeper 上。是 Pulsar 刪除的最小單位;

    Fragment : 每個 Ledger 由若幹 Fragment 組成。

    2.3 RocketMQ

    2.3.1 系統框架

    RocketMQ 是開源的訊息中介軟體,它是一個開源的分布式訊息傳遞和流式數據平台。總共有四大部份: NameServer,Broker,Producer,Consumer。

    NameServer 主要用來管理 brokers 以及路由資訊。broker 伺服器啟動時會註冊到 NameServer 上,並且兩者之間保持心跳監測機制,以此來保證 NameServer 知道 broker 的存活狀態。而且,每一台 NameServer 都存有全部的 broker 集群資訊和生產者/消費者客戶端的請求資訊。

    Broker 負責管理訊息儲存分發,主從數據同步,為訊息建立索引,提供訊息查詢等能力。

    2.3.2 基本術語

    Topic: 一個 Topic 可以有 0 個、1 個、多個生產者向其發送訊息,一個生產者也可以同時向不同的 Topic 發送訊息。一個 Topic 也可以被 0 個、1 個、多個消費者訂閱;

    Tag: 訊息二級類別,可以為使用者提供額外的靈活度,一條訊息可以沒有 tag;

    Producer: 訊息生產者;

    Broker: 儲存訊息,以 Topic 為緯度輕量級的佇列;轉發訊息,單個 Broker 節點與所有的 NameServer 節點保持長連線及心跳,會定時將 Topic 資訊註冊到 NameServer;

    Consumer: 訊息消費者,負責接收並消費訊息;

    MessageQueue: 訊息的物理管理單位,一個 Topic 可以有多個 Queue,Queue 的引入實作了水平擴充套件的能力;

    NameServer: 負責對原數據的管理,包括 Topic 和路由資訊,每個 NameServer 之間是沒有通訊的;

    Group: 一個組可以訂閱多個 Topic,ProducerGroup、ConsumerGroup 分別是一類生產者和一類消費者;

    Offset: 透過 Offset 存取儲存單元,RocketMQ 中所有訊息都是持久化的,且儲存單元定長。Offset 為 Java Long 類別,理論上 100 年內不會溢位,所以認為 Message Queue 是無限長的數據,Offset 是下標;

    Consumer: 支持 PUSH 和 PULL 兩種消費模式,支持集群消費和廣播消費。

    2.4 RabbitMQ

    2.4.1 系統框架

    RabbitMQ 基於 AMQP 協定來實作,主要由 Exchange 和 Queue 兩部份組成,然後透過 RoutingKey 關聯起來,訊息投遞到 Exchange 然後透過 Queue 接收。

    2.4.2 基本術語

    Broker: 接收客戶端連結實體,實作 AMQP 訊息佇列和路由功能;

    Virtual Host: 是一個虛擬概念,許可權控制的最小單位。一個 Virtual Host 裏包含多個 Exchange 和 Queue;

    Exchange: 接收訊息生產者的訊息並將訊息轉發到佇列。發送訊息時根據不同 ExchangeType 的決定路由規則,ExchangeType 常用的有:direct、fanout 和 topic 三種;

    Message Queue: 訊息佇列,儲存為被消費的訊息;

    Message: 由 Header 和 Body 組成,Header 是生產者添加的各種內容,包含 Message 是否持久化、哪個 MessageQueue 接收、優先級。Body 是具體的訊息內容;

    Binding: Binding 連線起了 Exchange 和 Message Queue。在伺服器執行時,會生成一張路由表,這張路由表上記錄著 MessageQueue 的條件和 BindingKey 值。當 Exchange 收到訊息後,會解析訊息中的 Header 得到 BindingKey,並根據路由表和 ExchangeType 將訊息發送到對應的 MessageQueue。最終的匹配模式是由 ExchangeType 決定;

    Connection: 在 Broker 和客戶端之間的 TCP 連線;

    Channel: 通道。Broker 和客戶端只有 tcp 連線是不能發送訊息的,必須建立通道。AMQP 協定規定只有透過 Channel 才能執行 AMQP 命令。一個 Connection 可以包含多個 Channel。之所以需要建立 Channel,是因為每個 TCP 連線都是很寶貴的。如果每個客戶端、每個執行緒都需要和 Broker 互動,都需要維護一個 TCP 連線的話是機器耗費資源的,一般建議共享 Connection。RabbitMQ 不建議客戶端執行緒之前共享 Channel,至少保證同一 Channel 發小訊息是穿行的;

    Command: AMQP 命令,客戶端透過 Command 來完成和 AMQP 伺服器的互動。

    2.5 NSQ

    2.5.1 系統框架

    NSQ 主要有 nsqlookup、nsqd 兩部份組成:

  • Nsqlookup 為守護行程,負責管理拓撲資訊並提供發現服務。客戶端透過查詢 nsqlookupd 獲取指定 Topic 所在的 nsqd 節點。nsqd 往 nsqlookup 上註冊和廣播自身 topic 和 channel 的資訊。
  • nsqd 在伺服端執行的守護行程,負責接收,排隊,投遞訊息給客戶端。
  • NSQ 由 3 個守護行程組成:

  • nsqd 是接收、佇列和傳送訊息到客戶端的守護行程。
  • nsqlookupd 是管理的拓撲資訊,並提供了最終一致發現服務的守護行程。客戶端透過查詢 nsqlookupd 獲取指定 Topic 所在的 nsqd 節點。nsqd 往 nsqlookup 上註冊和廣播自身 topic 和 channel 的資訊。
  • nsqadmin 是一個 Web UI 來即時監控集群(和執行各種管理任務)。
  • 03

    選型要點

    3.1 選型參考

  • 訊息順序:發送到佇列的訊息,消費時是否可以保證消費的順序;
  • 伸縮:當訊息佇列效能有問題,比如消費太慢,是否可以快速支持擴容;當消費佇列過多,浪費系統資源,是否可以支持縮容。
  • 訊息留存:訊息消費成功後,是否還會繼續保留在訊息佇列;
  • 容錯性:當一條訊息消費失敗後,是否有一些機制,保證這條訊息一定能成功,比如異步第三方退款訊息,需要保證這條訊息消費掉,才能確定給使用者退款成功,所以必須保證這條訊息消費成功的準確性;
  • 訊息可靠性:是否會存在丟訊息的情況,比如有 A/B 兩個訊息,最後只有 B 訊息能消費,A 訊息遺失;
  • 訊息時序:主要包括「訊息存活時間」和「延遲訊息」;
  • 吞吐量:支持的最高並行數;
  • 訊息路由:根據路由規則,只訂閱匹配路由規則的訊息,比如有 A/B 兩者規則的訊息,消費者可以只訂閱 A 訊息,B 訊息不會消費。
  • 3.2 訊息佇列對比

    註:作為 LShift 和 CohesiveFT 於 2007 年成立的合資企業,RabbitMQ 於 2010 年 4 月被 VMware 旗下的 SpringSource 收購。

    04

    功能剖析

    4.1 消費推拉模式

    客戶端消費者獲取訊息的方式,Kafka 和 RocketMQ 是透過長輪詢 Pull 的方式拉取訊息,RabbitMQ、Pulsar、NSQ 都是透過 Push 的方式。

    pull 類別的訊息佇列更適合高吞吐量的場景,允許消費者自己進行流量控制,根據消費者實際的消費能力去獲取訊息。而 push 類別的訊息佇列,即時性更好,但需要有一套良好的流控策略(backpressure)當消費者消費能力不足時,減少 push 的消費數量,避免壓垮消費端。

    4.2 延遲佇列

    訊息延遲投遞,當訊息產生送達訊息佇列時,有些業務場景並不希望消費者立刻收到訊息,而是等待特定時間後,消費者才能拿到這個訊息進行消費。延遲佇列一般分為兩種,基於訊息的延遲和基於佇列的延遲。基於訊息的延遲指為每條訊息設定不同的延遲時間,當佇列有新訊息進入的時候根據延遲時間排序,當然這樣會對效能造成較大影響。另一種基於佇列的延遲指的是設定不同延遲級別的佇列,佇列中每個訊息的延遲時間都是相同的,這樣免去了基於延遲時間排序對效能帶來的損耗,透過一定的掃描策略即可投遞超時的訊息。

    延遲訊息的使用場景比如異常檢測重試,訂單超時取消等,例如:

  • 服務請求異常,需要將異常請求放到單獨的佇列,隔 5 分鐘後進行重試;
  • 使用者購買商品,但一直處於未支付狀態,需要定期提醒使用者支付,超時則關閉訂單;
  • 面試或者會議預約,在面試或者會議開始前半小時,發送通知再次提醒。
  • Kafka 不支持延遲訊息。Pulsar 支持秒級的延遲訊息,所有延遲投遞的訊息會被 Delayed Message Tracker 記錄對應的 index,consumer 在消費時,會先去 Delayed Message Tracker 檢查,是否有到期需要投遞的訊息,如果有到期的訊息,則從 Tracker 中拿出對應的 index,找到對應的訊息進行消費,如果沒有到期的訊息,則直接消費正常的訊息。對於長時間的延遲訊息,會被儲存在磁盤中,當快到延遲間隔時才被載入到記憶體裏。

    RocketMQ 開源版本延遲訊息臨時儲存在一個內部主題中,不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。

    RabbitMQ 需要安裝一個 rabbitmq_delayed_message_exchange 外掛程式。

    NSQ 透過記憶體中的優先級佇列來保存延遲訊息,支持秒級精度,最多 2 個小時延遲。

    4.3 死信佇列

    由於某些原因訊息無法被正確的投遞,為了確保訊息不會被無故的丟棄,一般將其置於一個特殊角色的佇列,這個佇列一般稱之為死信佇列。與此對應的還有一個「回退佇列」的概念,試想如果消費者在消費時發生了異常,那麽就不會對這一次消費進行確認(Ack), 進而發生回滾訊息的操作之後訊息始終會放在佇列的頂部,然後不斷被處理和回滾,導致佇列陷入死迴圈。為了解決這個問題,可以為每個佇列設定一個回退佇列,它和死信佇列都是為異常的處理提供的一種機制保障。實際情況下,回退佇列的角色可以由死信佇列和重試佇列來扮演。

  • Kafka 沒有死信佇列,透過 Offset 的方式記錄當前消費的偏移量。
  • Pulsar 有重試機制,當某些訊息第一次被消費者消費後,沒有得到正常的回應,則會進入重試 Topic 中,當重試達到一定次數後,停止重試,投遞到死信 Topic 中。
  • RocketMQ 透過 DLQ 來記錄所有消費失敗的訊息。
  • RabbitMQ 是利用類似於延遲佇列的形式實作死信佇列。
  • NSQ 沒有死信佇列。
  • 4.4 優先級佇列

    有一些業務場景下,我們需要優先處理一些訊息,比如銀行裏面的金卡客戶、銀卡客戶優先級高於普通客戶,他們的業務需要優先處理。如下圖:

    優先級佇列不同於先進先出佇列,優先級高的訊息具備優先被消費的特權,這樣可以為下遊提供不同訊息級別的保證。不過這個優先級也是需要有一個前提的:如果消費者的消費速度大於生產者的速度,並且訊息中介軟體伺服器(一般簡單的稱之為 Broker)中沒有訊息堆積,那麽對於發送的訊息設定優先級也就沒有什麽實質性的意義了,因為生產者剛發送完一條訊息就被消費者消費了,那麽就相當於 Broker 中至多只有一條訊息,對於單條訊息來說優先級是沒有什麽意義的。

    Kafka、RocketMQ、Pulsar、NSQ 不支持優先級佇列,可以透過不同的佇列來實作訊息優先級。

    RabbitMQ 支持優先級訊息。

    4.5 訊息回溯

    一般訊息在消費完成之後就被處理了,之後再也不能消費到該條訊息。訊息回溯正好相反,是指訊息在消費完成之後,還能消費到之前被消費掉的訊息。對於訊息而言,經常面臨的問題是「訊息遺失」,至於是真正由於訊息中介軟體的缺陷遺失還是由於使用方的誤用而遺失一般很難追查,如果訊息中介軟體本身具備訊息回溯功能的話,可以透過回溯消費復現「遺失的」訊息進而查出問題的源頭之所在。訊息回溯的作用遠不止與此,比如還有索引恢復、本地緩存重建,有些業務補償方案也可以采用回溯的方式來實作。

  • Kafka 支持訊息回溯,可以根據時間戳或指定 Offset,重設 Consumer 的 Offset 使其可以重復消費。
  • Pulsar 支持按時間對訊息進行回溯。
  • RocketMQ 支持按時間回溯,實作的原理跟 Kafka 一致。
  • RabbitMQ 不支持回溯,訊息一旦標記確認就會被標記刪除。
  • NSQ 一般訊息是不可回溯的,但可以透過 nsq_to_file 工具,將訊息寫入到檔,然後從檔裏重放訊息。
  • 4.6 訊息持久化

    流量削峰是訊息中介軟體的一個非常重要的功能,而這個功能其實得益於其訊息堆積能力。從某種意義上來講,如果一個訊息中介軟體不具備訊息堆積的能力,那麽就不能把它看做是一個合格的訊息中介軟體。訊息堆積分記憶體式堆積和磁盤式堆積。一般來說,磁盤的容量會比記憶體的容量要大得多,對於磁盤式的堆積其堆積能力就是整個磁盤的大小。從另外一個角度講,訊息堆積也為訊息中介軟體提供了冗余儲存的功能。

    Kafka 和 RocketMQ 直接將訊息刷入磁盤檔中進行持久化,所有的訊息都儲存在磁盤中。只要磁盤容量夠,可以做到無限訊息堆積。

    RabbitMQ 是典型的記憶體式堆積,但這並非絕對,在某些條件觸發後會有換頁動作來將記憶體中的訊息換頁到磁盤(換頁動作會影響吞吐),或者直接使用惰性佇列來將訊息直接持久化至磁盤中。

    Pulsar 訊息是儲存在 BookKeeper 儲存集群上,也是磁盤檔。

    NSQ 透過 nsq_to_file 工具,將訊息寫入到檔。

    4.7 訊息確認機制

    訊息佇列需要管理消費進度,確認消費者是否成功處理訊息,使用 push 的方式的訊息佇列元件往往是對單條訊息進行確認,對於未確認的訊息,進行延遲重新投遞或者進入死信佇列。

    Kafka 透過 Offset 的方式確認訊息。

    1. 發送方確認機制 ack=0,不管訊息是否成功寫入分區 ack=1,訊息成功寫入首領分區後,返回成功 ack=all,訊息成功寫入所有分區後,返回成功。
    2. 接收方確認機制 自動或者手動送出分區偏移量,早期版本的 kafka 偏移量是送出給 Zookeeper 的,這樣使得 zookeeper 的壓力比較大,更新版本的 kafka 的偏移量是送出給 kafka 伺服器的,不再依賴於 zookeeper 群組,集群的效能更加穩定。

    RocketMQ 與 Kafka 類似也會送出 Offset,區別在於消費者對於消費失敗的訊息,可以標記為訊息消費失敗,Broker 會重試投遞,如果累計多次消費失敗,會投遞到死信佇列。

    RabbitMQ 和 NSQ 類似,消費者確認單條訊息,否則會重新放回佇列中等待下次投遞。

    1. 發送方確認機制,訊息被投遞到所有匹配的佇列後,返回成功。如果訊息和佇列是可持久化的,那麽在寫入磁盤後,返回成功。支持批次確認和異步確認。
    2. 接收方確認機制,設定 autoAck 為 false,需要顯式確認,設定 autoAck 為 true,自動確認。當 autoAck 為 false 的時候,RabbitMQ 佇列會分成兩部份,一部份是等待投遞給 consumer 的訊息,一部份是已經投遞但是沒收到確認的訊息。如果一直沒有收到確認訊號,並且 consumer 已經斷開連線,RabbitMQ 會安排這個訊息重新進入佇列,投遞給原來的消費者或者下一個消費者。未確認的訊息不會有過期時間,如果一直沒有確認,並且沒有斷開連線,RabbitMQ 會一直等待,RabbitMQ 允許一條訊息處理的時間可以很久很久。

    Pulsar 使用專門的 Cursor 管理。累積確認和 Kafka 效果一樣;提供單條或選擇性確認。

    4.8 訊息 TTL

    訊息 TTL 表示一條訊息的生存時間,如果訊息發出來後,在 TTL 的時間內沒有消費者進行消費,訊息佇列會將訊息刪除或者放入死信佇列中。

    Kafka 根據設定的保留期來刪除訊息。有可能訊息沒被消費,過期後被刪除。不支持 TTL。

    Pulsar 支持 TTL,如果訊息未在配置的 TTL 時間段內被任何消費者使用,則訊息將自動標記為已確認。訊息保留期與訊息 TTL 之間的區別在於:訊息保留期作用於標記為已確認並設定為已刪除的訊息,而 TTL 作用於未 ack 的訊息。上面的圖例中說明了 Pulsar 中的 TTL。例如,如果訂閱 B 沒有活動消費者,則在配置的 TTL 時間段過後,訊息 M10 將自動標記為已確認,即使沒有消費者實際讀取該訊息。

    RocketMQ 提及到訊息 TTL 的資料比較少,不過看介面似乎是支持的。

    RabbitMQ 有兩種方式,一個是聲明佇列的時候在佇列內容中設定,整個佇列中的訊息都有相同的有效期。還可以發送訊息的時候給訊息設定內容,可以位每條訊息都設定不同的 TTL。

    NSQ 似乎還沒支持,有一個 Feature Request 的 Issue 處於 Open 狀態。

    4.9 多租戶隔離

    多租戶是指透過一個軟件例項為多個租戶提供服務的能力。租戶是指對系統有著相同「檢視」的一組使用者。不支持多租戶的系統裏邊,往往要為不同使用者或者不同集群建立多個訊息佇列例項實作物理隔離,這樣會帶來較高的運維成本。作為一種企業級的訊息系統,Pulsar 的多租戶能力按照設計可滿足下列需求:

  • 確保嚴苛的 SLA 可順利滿足。
  • 保證不同租戶之間的隔離。
  • 針對資源利用率強制實施配額。
  • 提供每租戶和系統級的安全性。
  • 確保低成本運維以及盡可能簡單的管理。
  • Pulsar 透過下列方式滿足了上述需求:

  • 透過為每個租戶進行身份驗證、授權和 ACL(存取控制列表)獲得所需安全性。
  • 為每個租戶強制實施儲存配額。
  • 以策略的方式定義所有隔離機制,策略可在執行過程中更改,借此降低運維成本並簡化管理工作。

    4.10 訊息順序性

    訊息順序性是指保證訊息有序。訊息消費順序跟生產的順序保持一致。

  • Kafka 保證了分區內的訊息有序。
  • Pulsar 支持兩種消費模式,獨占訂閱的流模式只保證了訊息的順序性,共享訂閱佇列模型不保證有序性。
  • RocketMQ 需要用到鎖來保證一個佇列同時只有一個消費者執行緒進行消費,保證訊息的有序性。
  • RabbitMQ 順序性的條件比較苛刻,需要單執行緒發送、單執行緒消費,並且不采用延遲佇列、優先級佇列等高級功能。
  • NSQ 是利用了 golang 自身的 case/select 實作的訊息分發,本身不提供有序性保障,不能夠把特性訊息和消費者對應起來,無法實作訊息的有序性。
  • 4.11 訊息查詢

    在實際開發中,經常要檢視 MQ 中訊息的內容,比如透過某個 MessageKey/ID,查詢到 MQ 的具體訊息。或者是對訊息進行鏈路追蹤,知道訊息從哪裏來,發送到哪裏去,進而快速對問題進行排查定位。

    Kafka 儲存層是以分布式送出日誌的形式實作,每次寫操作都順序追加到日誌的末尾。讀也是順序讀。不支持檢索功能。

    Pulsar 可以透過訊息 ID,查詢到具體某條訊息的訊息內容、訊息參數和訊息軌跡。

    RocketMQ 支持按 Message Key、Unique Key、Message Id 對訊息進行查詢。

    RabbitMQ 使用基於索引的儲存系統。這些將數據保存在樹結構中,以提供確認單個訊息所需的快速存取。由於 RabbitMQ 的訊息在確認後會被刪除,因此只能查詢未確認的訊息。

    NSQ 自身不支持訊息持久化和訊息檢索,不過可以使用 nsq_to_http 等工具將訊息寫入可支持索引的儲存裏。

    4.12 消費模式

    Kafka 有兩種消費模式,最終都會保證一個分區只有 1 個消費者在消費:

  • subscribe 方式:當主題分區數量變化或者 consumer 數量變化時,會進行 rebalance;註冊 rebalance 監聽器,可以手動管理 offset 不註冊監聽器,Kafka 自動管理。
  • assign 方式:手動將 consumer 與 partition 進行對應,Kafka 不會進行 rebanlance。
  • Pulsar 有以下四種消費模式,其中獨占模式和災備模式跟 Kafka 類似,為流模型,每個分區只有 1 個消費者消費,能保證訊息有序性。共享模式和 Key 共享模式為佇列模型,多個消費者能提高消費速度,但不能保證有序性。

  • Exclusive 獨占模式(預設模式):一個 Subscription 只能與一個 Consumer 關聯,只有這個 Consumer 可以接收到 Topic 的全部訊息,如果該 Consumer 出現故障了就會停止消費。
  • 災備模式(Failover):當存在多個 consumer 時,將會按字典順序排序,第一個 consumer 被初始化為唯一接受訊息的消費者。當第一個 consumer 斷開時,所有的訊息(未被確認和後續進入的)將會被分發給佇列中的下一個 consumer。
  • 共享模式(Shared):訊息透過 round robin 輪詢機制(也可以自訂)分發給不同的消費者,並且每個訊息僅會被分發給一個消費者。當消費者斷開連線,所有被發送給他,但沒有被確認的訊息將被重新安排,分發給其它存活的消費者。
  • KEY 共享模式(Key_Shared):當存在多個 consumer 時,將根據訊息的 key 進行分發,key 相同的訊息只會被分發到同一個消費者。
  • RocketMQ 有兩種消費模式,BROADCASTING 廣播模式,CLUSTERING 集群模式。

    廣播消費指的是:一條訊息被多個 consumer 消費,即使這些 consumer 屬於同一個 ConsumerGroup,訊息也會被 ConsumerGroup 中的每個 Consumer 都消費一次,廣播消費中 ConsumerGroup 概念可以認為在訊息劃分方面無意義。

    集群消費模式:一個 ConsumerGroup 中的 Consumer 例項平均分攤消費訊息。例如某個 Topic 有 9 條訊息,其中一個 ConsumerGroup 有 3 個例項(可能是 3 個行程,或者 3 台機器),那麽每個例項只消費其中部份,消費完的訊息不能被其他例項消費。

    RabbitMQ 和 NSQ 的消費比較類似,都是跟 Pulsar 共享模式類似的,佇列的形式,增加一個消費者組裏的消費者數量能提高消費速度。

    4.13 訊息可靠性

    訊息遺失是使用訊息中介軟體時所不得不面對的一個同點,其背後訊息可靠性也是衡量訊息中介軟體好壞的一個關鍵因素。尤其是在金融支付領域,訊息可靠性尤為重要。比如當服務出現故障時,一些對於生產者來說已經生產成功的訊息,是否會在高可用切換時遺失。同步刷盤是增強一個元件可靠性的有效方式,訊息中介軟體也不例外,Kafka 和 RabbitMQ 都可以支持同步刷盤,但絕大多數情景下,一個元件的可靠性不應該由同步刷盤這種極其損耗效能的操作來保障,而是采用多副本的機制來保證。

    Kafka 可以透過配置 request.required.acks 參數設定可靠級別,表示一條訊息有多少個副本確認接收成功後,才被任務發送成功。

  • request.required.acks=-1 (全量同步確認,強可靠性保證)
  • request.required.acks=1(leader 確認收到,預設)
  • request.required.acks=0 (不確認,但是吞吐量大)
  • Pulsar 有跟 Kafka 類似的概念,叫 Ack Quorum Size(Qa),Qa 是每次寫請求發送完畢後需要回復確認的 Bookie 的個數,其數值越大則需要確認寫成功的時間越長,其值上限是副本數 Qw。為了一致性,Qa 應該是:(Qw+1)/2 或者更,即為了確保數據安全性,Qa 下限是 (Qw+1)/2。

    RocketMQ 與 Kafka 類似。

    RabbitMQ 是主從架構,透過映像環形佇列實作多副本及強一致性語意的。多副本可以保證在 master 節點宕機異常之後可以提升 slave 作為新的 master 而繼續提供服務來保障可用性。

    NSQ 會透過 go-diskqueue 元件將訊息落盤到本地檔中,透過 mem-queue-size 參數控制記憶體中佇列大小,如果 mem-queue-size=0 每條訊息都會儲存到磁盤裏,不用擔心節點重新開機引起的訊息遺失。但由於是儲存在本地磁盤中,如果節點離線,堆積在節點磁盤裏的訊息會遺失。

    4.14 負載均衡

    Kafka: 支持負載均衡。一個 broker 通常就是一台伺服器節點。對於同一個 Topic 的不同分區,Kafka 會盡力將這些分區分布到不同的 Broker 伺服器上,zookeeper 保存了 broker、主題和分區的後設資料資訊。分區首領會處理來自客戶端的生產請求,Kafka 分區首領會被分配到不同的 broker 伺服器上,讓不同的 broker 伺服器共同分擔任務。

    每一個 broker 都緩存了後設資料資訊,客戶端可以從任意一個 broker 獲取後設資料資訊並緩存起來,根據後設資料資訊知道要往哪裏發送請求。

    Kafka 的消費者組訂閱同一個 topic,會盡可能地使得每一個消費者分配到相同數量的分區,分攤負載。

    當消費者加入或者結束消費者組的時候,還會觸發再均衡,為每一個消費者重新分配分區,分攤負載。

    Kafka 的負載均衡大部份是自動完成的,分區的建立也是 Kafka 完成的,隱藏了很多細節,避免了繁瑣的配置和人為疏忽造成的負載問題。

    發送端由 topic 和 key 來決定訊息發往哪個分區,如果 key 為 null,那麽會使用輪詢演算法將訊息均衡地發送到同一個 topic 的不同分區中。如果 key 不為 null,那麽會根據 key 的 hashcode 取模計算出要發往的分區。

    RabbitMQ : 對負載均衡的支持不好。訊息被投遞到哪個佇列是由交換器和 key 決定的,交換器、路由鍵、佇列都需要手動建立。

    RabbitMQ 客戶端發送訊息要和 broker 建立連線,需要事先知道 broker 上有哪些交換器,有哪些佇列。通常要聲明要發送的目標佇列,如果沒有目標佇列,會在 broker 上建立一個佇列,如果有,就什麽都不處理,接著往這個佇列發送訊息。假設大部份繁重任務的佇列都建立在同一個 broker 上,那麽這個 broker 的負載就會過大。(可以在上線前預先建立佇列,無需聲明要發送的佇列,但是發送時不會嘗試建立佇列,可能出現找不到佇列的問題,RabbitMQ 的備份交換器會把找不到佇列的訊息保存到一個專門的佇列中,以便以後查詢使用)

    使用映像佇列機制建立 RabbitMQ 集群可以解決這個問題,形成 master-slave 的架構,master 節點會均勻分布在不同的伺服器上,讓每一台伺服器分攤負載。slave 節點只是負責轉發,在 master 失效時會選擇加入時間最長的 slave 成為 master。

    當新節點加入映像佇列的時候,佇列中的訊息不會同步到新的 slave 中,除非呼叫同步命令,但是呼叫命令後,佇列會阻塞,不能在生產環境中呼叫同步命令。

    當 RabbitMQ 佇列擁有多個消費者的時候,佇列收到的訊息將以輪詢的分發方式發送給消費者。每條訊息只會發送給訂閱列表裏的一個消費者,不會重復。

    這種方式非常適合擴充套件,而且是專門為並行程式設計的。

    如果某些消費者的任務比較繁重,那麽可以設定 basicQos 限制通道上消費者能保持的最大未確認訊息的數量,在達到上限時,RabbitMQ 不再向這個消費者發送任何訊息。

    對於 RabbitMQ 而言,客戶端與集群建立的 TCP 連線不是與集群中所有的節點建立連線,而是挑選其中一個節點建立連線。

    但是 RabbitMQ 集群可以借助 HAProxy、LVS 技術,或者在客戶端使用演算法實作負載均衡,引入負載均衡之後,各個客戶端的連線可以分攤到集群的各個節點之中。

    客戶端均衡演算法:

  • 輪詢法。按順序返回下一個伺服器的連線地址。
  • 加權輪詢法。給配置高、負載低的機器配置更高的權重,讓其處理更多的請求;而配置低、負載高的機器,給其分配較低的權重,降低其系統負載。
  • 隨機法。隨機選取一個伺服器的連線地址。
  • 加權隨機法。按照概率隨機選取連線地址。
  • 源地址哈希法。透過哈希函數計算得到的一個數值,用該數值對伺服器列表的大小進行取模運算。
  • 最小連線數法。動態選擇當前連線數最少的一台伺服器的連線地址。
  • zeromq: 去中心化,不支持負載均衡。本身只是一個多執行緒網絡庫。

    RocketMQ: 支持負載均衡。一個 broker 通常是一個伺服器節點,broker 分為 master 和 slave,master 和 slave 儲存的數據一樣,slave 從 master 同步數據。

    nameserver 與每個集群成員保持心跳,保存著 Topic-Broker 路由資訊,同一個 topic 的佇列會分布在不同的伺服器上。

    發送訊息透過輪詢佇列的方式發送,每個佇列接收平均的訊息量。發送訊息指定 topic、tags、keys,無法指定投遞到哪個佇列(沒有意義,集群消費和廣播消費跟訊息存放在哪個佇列沒有關系)。

    tags 選填,類似於 Gmail 為每封郵件設定的標簽,方便伺服器過濾使用。目前只支 持每個訊息設定一個 tag,所以也可以類比為 Notify 的 MessageType 概念。

    keys 選填,代表這條訊息的業務關鍵詞,伺服器會根據 keys 建立哈希索引,設定後, 可以在 Console 系統根據 Topic、Keys 來查詢訊息,由於是哈希索引,請盡可能 保證 key 唯一,例如訂單號,商品 Id 等。

    RocketMQ 的負載均衡策略規定:Consumer 數量應該小於等於 Queue 數量,如果 Consumer 超過 Queue 數量,那麽多余的 Consumer 將不能消費訊息。這一點和 kafka 是一致的,RocketMQ 會盡可能地為每一個 Consumer 分配相同數量的佇列,分攤負載。

    activemq: 支持負載均衡。可以基於 zookeeper 實作負載均衡。

    4.15 集群方式

    Kafka: 天然的‘Leader-Slave’無狀態集群,每台伺服器既是 Master 也是 Slave。分區首領均勻地分布在不同的 Kafka 伺服器上,分區副本也均勻地分布在不同的 Kafka 伺服器上,所以每一台 Kafka 伺服器既含有分區首領,同時又含有分區副本,每一台 Kafka 伺服器是某一台 Kafka 伺服器的 Slave,同時也是某一台 Kafka 伺服器的 leader。

    Kafka 的集群依賴於 zookeeper,zookeeper 支持熱擴充套件,所有的 broker、消費者、分區都可以動態加入移除,而無需關閉服務,與不依靠 zookeeper 集群的 mq 相比,這是最大的優勢。

    RabbitMQ: 支持簡單集群,'復制'模式,對高級集群模式支持不好。

    RabbitMQ 的每一個節點,不管是單一節點系統或者是集群中的一部份,要麽是記憶體節點,要麽是磁盤節點,集群中至少要有一個是磁盤節點。

    在 RabbitMQ 集群中建立佇列,集群只會在單個節點建立佇列行程和完整的佇列資訊(後設資料、狀態、內容),而不是在所有節點上建立。

    引入映像佇列,可以避免單點故障,確保服務的可用性,但是需要人為地為某些重要的佇列配置映像。

    zeromq: 去中心化,不支持集群。

    RocketMQ: 常用 多對'Master-Slave' 模式,開源版本需手動切換 Slave 變成 Master

    Name Server 是一個幾乎無狀態節點,可集群部署,節點之間無任何資訊同步。

    Broker 部署相對復雜,Broker 分為 Master 與 Slave,一個 Master 可以對應多個 Slave,但是一個 Slave 只能對應一個 Master,Master 與 Slave 的對應關系透過指定相同的 BrokerName,不同的 BrokerId 來定義,BrokerId 為 0 表示 Master,非 0 表示 Slave。Master 也可以部署多個。每個 Broker 與 Name Server 集群中的所有節點建立長連線,定時註冊 Topic 資訊到所有 Name Server。

    Producer 與 Name Server 集群中的其中一個節點(隨機選擇)建立長連線,定期從 Name Server 取 Topic 路由資訊,並向提供 Topic 服務的 Master 建立長連線,且定時向 Master 發送心跳。Producer 完全無狀態,可集群部署。

    Consumer 與 Name Server 集群中的其中一個節點(隨機選擇)建立長連線,定期從 Name Server 取 Topic 路由資訊,並向提供 Topic 服務的 Master、Slave 建立長連線,且定時向 Master、Slave 發送心跳。Consumer 既可以從 Master 訂閱訊息,也可以從 Slave 訂閱訊息,訂閱規則由 Broker 配置決定。

    客戶端先找到 NameServer, 然後透過 NameServer 再找到 Broker。

    一個 topic 有多個佇列,這些佇列會均勻地分布在不同的 broker 伺服器上。RocketMQ 佇列的概念和 Kafka 的分區概念是基本一致的,Kafka 同一個 topic 的分區盡可能地分布在不同的 broker 上,分區副本也會分布在不同的 broker 上。

    RocketMQ 集群的 slave 會從 master 拉取數據備份,master 分布在不同的 broker 上。

    activemq:支持簡單集群模式,比如'主-備',對高級集群模式支持不好。

    05

    效能

    Kafka 的公司 Confluent 在 2020 年 8 月發了一篇 Benchmarking Apache Kafka, Apache Pulsar, and RabbitMQ: Which is the Fastest?文章,並且提出了一個開源的 MQ Benchmark 框架 THE OPENMESSAGING BENCHMARK FRAMEWORK,在這個文件裏,對比了 Kafka、Pulsar、RabbitMQ 的吞吐量、端到端延遲等效能數據。最後得出結論 Kafka 相對來說效能最好。

    但接下來 StreamNative 在 2020 年 12 月指出了 Confluence 的基準測試的一些問題,並對 Pulsar 進行了參數調優之後重新執行了一遍結果,測試報告展示 Pulsar 能達到跟 Kafka 同樣的吞吐量,在某些場景下,Pulsar 的延遲顯著低於 Kafka。

    而且在效能測試上,有很多客戶端、伺服端參數設定、機器效能配置等影響,比如訊息可靠性級別,壓縮演算法等,很難做到「完全」控制變量公平的測試。而且 OpenMessaging Benchmark 的開源 Github 的 Readme 上也提到了。

    不過有幾個關註點:

  • RabbitMQ 的延遲是微秒級的,其他元件的延遲都是毫秒級,RabbitMQ 應該是 MQ 元件裏相對來說較低的。
  • Kafka 單例項在主題/分區數比較多的情況下,效能會明顯降低。
  • Kafka 是一個分區一個檔,當 topic 過多,分區的總量也會增加,Kafka 中存在過多的檔,當對訊息刷盤時,就會出現檔競爭磁盤,出現效能的下降。
  • 還有 Kafka 每個消費者加入或結束都會進行重平衡,當分區數比較多時重平衡可能耗時較久,在重平衡的階段消費者是不能消費訊息的。
  • 而 Pulsar 由於儲存與計算分離的架構,使得它可以支持百萬級別的 Topic 數量。
  • Pulsar 和 Kafka 都被廣泛用於各個企業,也各有優勢,都能透過數量基本相同的硬件處理大流量。部份使用者誤以為 Pulsar 使用了很多元件,因此需要很多伺服器來實作與 Kafka 相匹敵的效能。這種想法適用於一些特定硬件配置,但在多數資源配置相同的情況中,Pulsar 的優勢更加明顯,可以用相同的資源實作更好的效能。舉例來說,Splunk 最近分享了他們選擇 Pulsar 放棄 Kafka 的原因,其中提到「由於分層架構,Pulsar 幫助他們將成本降低了 30%-50%,延遲降低了 80%-98%,營運成本降低了 33%-50%」。Splunk 團隊發現 Pulsar 可以更好地利用磁盤 IO,降低 CPU 利用率,同時更好地控制記憶體。

    在分布式系統裏,單機效能指標雖然也很重要,分布式系統整體的效能以及靈活擴縮容、高可用容災等能力也會是評估的一個重要參考。MQ 中介軟體具體的效能指標,也需要我們自己根據實際的情況,根據實際購買的集群配置和客戶端參數,進行壓測調優來評估。

    06

    運維

    在使用過程中難免會出現各種異常情況,比如宕機、網絡抖動、擴容等。訊息佇列具備異地容災,高可用架構等能力,能避免一些計算節點、網絡等基礎設施不可用導致的故障。

    6.1 高可用

    Kafka 透過分區多副本的方式解決高可用問題。

    Pulsar 的計算集群 Broker 是無狀態的,可以靈活擴縮容,儲存節點 Bookie 上透過訊息分區分片副本的方式,每個分片都有一個或多個副本,保證在某一個 Bookie 掛掉後,有其他分片可以提供服務。

    RocketMQ 和 RabbitMQ 都是主從架構,當 master 掛掉後,由原來的從節點繼續提供服務。備機提供消費服務,保證訊息不丟,但不提供寫服務。

    NSQ 是類似分布式架構,不過由於訊息儲存是在節點本地磁盤上,如果一個節點離線,堆積在節點磁盤上的訊息會遺失。

    6.2 跨地域容災

    Pulsar 原生支持跨地域容災功能,在這個圖中,每當 P1、P2 和 P3 的生產者分別向 Cluster-A、Cluster-B 和 Cluster-C 中的 T1 topic 發送訊息時,這些訊息很快在不同的集群中復制。一旦訊息完成復制,消費者 C1 和 C2 會從各自的集群消費到這個訊息。

    在這個跨地域容災的設計支撐下,其一,我們可以比較容易的將服務分散到多個機房;其二,可以應對機房級別的故障,即在一個機房不可用的情況下,服務可以轉接到其它的機房來繼續對外提供服務。

    一句話概括,Pulsar 的跨地域復制,其實就是在一個本地集群中建立一個 Producer,把異地的集群作為這個 Producer 的發送地址,將本地集群的訊息發送過去,並且在本地維護一個 Cusor 來保證訊息可靠性和冪等性。

    6.3 集群擴容

    當訊息量突然上漲,訊息佇列集群到達瓶頸的時候,需要對集群進行擴容,擴容一般分為水平擴容和垂直擴容兩種方式,水平擴容指的是往往集群中增加節點,垂直擴容指的是把集群中部份節點的配置調高,增加處理能力。

    Kafka 集群由於主題分區是物理儲存在 Broker 節點上的,新加入的集群的節點並沒有儲存分區分片,也就無法提供馬上提供服務,因此需要把一些 Topic 的分區分配到新加入的節點裏,這裏會涉及到一個分區數據均衡的過程,將某些分區的數據復制到新節點上。這個過程跟分區當前堆積的數據量、Broker 效能有關,有可能會出現由於源 Broker 負載過高,堆積數據過大,導致數據均衡的時間變長。

    Pulsar 的無限分布式日誌以分片為中心,借助擴充套件日誌儲存(透過 Apache BookKeeper)實作,內建分層儲存支持,因此分片可以均勻地分布在儲存節點上。由於與任一給定 topic 相關的數據都不會與特定儲存節點進行捆綁,因此很容易替換儲存節點或縮擴容。另外,集群中最小或最慢的節點也不會成為儲存或頻寬的短板。

    RocketMQ 新節點直接加入到集群中,在新的 broker 建立新 topic 並且分配佇列,或者在已有 topic 基礎上分配佇列。與 Kafka 的區別是,Kafka 的分區是在不同的物理機器上,而 RocketMQ 是邏輯分區,用的佇列形式,因此不存在出現數據不均衡的情況。

    RabbitMQ 和 NSQ 類似,由於不涉及過多的訊息持久化,直接往集群中增加節點。

    6.4 使用成本

    Kafka/Pulsar/RocketMQ/RabbitMQ 在騰訊雲上都上線了標準產品,可以直接購買建立例項(產品選型),能大大降低部署運維成本。而 NSQ 目前暫時還沒有上線,需要自行部署。

    07

    常見問題 & 使用場景

    7.1 Kafka

  • 日誌收集:大量的日誌訊息先寫入 Kafka,數據服務透過消費 Kafka 訊息將數據落地;
  • 7.2 RocketMQ

  • 為金融互聯網領域而生,對於可靠性要求很高的場景。
  • 7.3 普通訊息

    訊息佇列最基礎的功能就是生產者發送訊息、Broker 保存訊息,消費者來消費訊息,以此實作系統解耦、削峰填谷的作用。

    普通訊息是訊息佇列必備的訊息類別,也是系統使用場景最多的一種訊息。

    7.4 順序訊息

    順序訊息是指生產者發送訊息的順序和消費者消費訊息的順序是一致的。比如在一個電商場景,同一個使用者送出訂單、訂單支付、訂單出庫,這三個訊息消費者需要按照順序來進行消費。如下圖:

    順序訊息的實作並不容易,原因如下:

  • 生產者集群中,有多個生產者發送訊息,網絡延遲不一樣,很難保證發送到 Broker 的訊息落盤順序是一致的;
  • 如果 Broker 有多個分區或佇列,生產者發送的訊息會進入多個分區,也無法保證順序消費;
  • 如果有多個消費者來異步消費同一個分區,很難保證消費順序跟生產者發送順序一致。
  • 要保證訊息有序,需要滿足兩個條件:

  • 同一個生產者必須同步發送訊息到同一個分區;
  • 一個分區只能給同一個消費者消費。
  • 如下圖:

    上面第二個條件是比較容易實作的,一個分區繫結一個消費者就可以,主要是第一個條件。

    在主流訊息佇列的實作中,Kafka 和 Pulsar 的實作方式類似,生產者給訊息賦值一個 key,對 key 做 Hash 運算來指定訊息發送到哪一個分區。比如上面電商的例子,對同一個使用者的一筆訂單,送出訂單、訂單支付、訂單出庫這三個訊息賦值同一個 key,就可以把這三條訊息發送到同一個分區。

    對於 RocketMQ,生產者在發送訊息的時候,可以透過 MessageQueueSelector 指定把訊息投遞到那個 MessageQueue,如下圖:

    7.5 延時訊息

    或者也叫定時訊息,是指訊息發送後不會立即被消費,而是指定一個時間,到時間後再消費。經典的場景比如電商購物時,30 分鐘未支付訂單,讓訂單自動失效。

    7.5.1 RocketMQ 實作

    RocketMQ 定義了 18 個延時級別,每個延時級別對應一個延時時間。下面如果延遲級別是 3,則訊息會延遲 10s 才會拉取。

    RocketMQ 的延時訊息如下圖:

    生產者把消費發送到 Broker 後,Broker 首先把訊息保存到 SCHEDULE_TOPIC_XXXX 這個 Topic,然後排程任務會判斷是否到期,如果到期,會把訊息從 SCHEDULE_TOPIC_XXXX 取出投遞到原始的 queue,這樣消費者就可以消費到了。

    RocketMQ 的延時訊息只支持最大兩個小時的延時,不過 RocketMQ5.0 基於時間輪演算法實作了定時訊息,解決了這個問題。

    7.5.2 Pulsar 實作

    Pulsar 的實作如下圖:

    Pulsar 的延時訊息首先會寫入一個 Delayed Message Tracker 的數據結構中,Delayed Message Tracker 根據延時時間構建 delayed index 優先級佇列。消費者拉取訊息時,首先去 Delayed Message Tracker 檢查是否有到期的訊息。如果有則直接拉取進行消費。

    7.5.3 RabbitMQ 實作

    RabbitMQ 的實作方式有兩種,一種是投遞到普通佇列都不消費,等訊息過期後被投遞到死信佇列,消費者消費死信佇列。如下圖:

    第二種方式是生產者發送訊息時,先發送到本地 Mnesia 數據庫,訊息到期後定時器再將訊息投遞到 broker。

    7.5.4 Kafka 實作

    Kafka 本身並沒有延時佇列,不過可以透過生產者攔截器來實作訊息延時發送,也可以定義延時 Topic,利用類似 RocketMQ 的方案來實作延時訊息。

    7.6 事務訊息

    事務訊息是指生產訊息和消費訊息滿足事務的特性。

    RabbitMQ 和 Kafka 的事務訊息都是只支持生產訊息的事務特性,即一批訊息要不全部發送成功,要不全部發送失敗。

    RabbitMQ 透過 Channel 來開啟事務訊息,程式碼如下:

    ConnectionFactory factory=new ConnectionFactory();connection=factory.newConnection();Channel channel=connection.createChannel();//開啟事務channel.txSelect();channel.basicPublish("directTransactionExchange","transactionRoutingKey",null,message.getBytes("utf-8"));//送出事務 或者 channel.txRollback()回滾事務channel.txCommit();

    Kafka 可以給多個生產者設定同一個事務 ID ,從而把多個 Topic 、多個 Partition 放在一個事務中,實作原子性寫入。

    Pulsar 的事務訊息對於事務語意的定義是:允許事件流套用將消費、處理、生產訊息整個過程定義為一個原子操作。可見,Pulsar 的事務訊息可以覆蓋訊息流整個過程。

    RocketMQ 的事務訊息是透過 half 訊息來實作的。以電商購物場景來看,賬戶服務扣減賬戶金額後,發送訊息給 Broker,庫存服務來消費這條訊息進行扣減庫存。如下圖:

    可見,RocketMQ 只能保證生產者發送訊息和本地事務的原子性,並不能保證消費訊息的原子性。

    7.7 軌跡訊息

    軌跡訊息主要用於跟蹤訊息的生命周期,當訊息遺失時可以很方便地找出原因。

    軌跡訊息也跟普通訊息一樣,也需要儲存和查詢,也會占用訊息佇列的資源,所以選擇軌跡訊息要考慮下面幾點:

  • 訊息生命周期的關鍵節點一定要記錄;
  • 不能影響正常訊息的發送和消費效能;
  • 不能影響 Broker 的訊息儲存效能;
  • 要考慮訊息查詢維度和效能。
  • RabbitMQ Broker 實作了軌跡訊息的功能,開啟 Trace 開關,就可以把軌跡訊息發送到 amq.rabbitmq.trace 這個 exchange,但是要考慮軌跡訊息會不會給 Broker 造成 壓力進而導致訊息積壓。RabbitMQ 的生產者和消費者都沒有實作軌跡訊息,需要開發者自己來實作。

    RocketMQ 生產者、Broker 和消費者都實作了軌跡訊息,不過預設是關閉的,需要手工開啟。

    使用軌跡訊息,需要考慮記錄哪些節點、儲存介質、效能、查詢方式等問題。

    7.8 Kafka 是否會訊息遺失?

  • 只對「已送出」的訊息做有限度的持久化保證
  • 已送出的訊息:訊息寫入日誌檔
  • 有限度的持久化保證:N 個 broker 至少一個存活
  • 生產者遺失數據
  • producer.send(msg) 異步發送訊息,不保證數據到達 Kafka
  • producer.send(msg, callback) 判斷回呼
  • 消費者程式遺失數據
  • 應該「先消費訊息,後更新位移的順序」
  • 新問題:訊息的重復處理
  • 多執行緒異步處理訊息,Consumer 不要開啟自動送出位移,應用程式手動送出位移
  • 7.9 Kafka 如何持久化?

  • 訊息日誌(Log)保存數據,磁盤追加寫(Append-only)
  • 避免緩慢的隨機 I/O 操作
  • 高吞吐
  • 定期刪除訊息(日誌段)
  • 7.10 Kafka 檔儲存機制

  • 每個 partition 相當於一個巨型檔 → 多個大小相等 segment 數據檔中
  • 每個 partition 只需要順序讀寫就行了,segment 檔生命周期由配置決定
  • segment file 組成:
  • index file:索引檔
  • data file:數據檔
  • segment file 檔命名規則:
  • 全域第一個 segment 是 0
  • 後序每個加上全域 partition 的最大 offset
  • 一對 segment file

    message 物理結構

    7.11 Kafka 分區

    為什麽分區?

  • Kafka 的訊息組織方式:主題-分區-訊息;
  • 一條訊息,僅存在某一個分區中;
  • 提高伸縮性,不同分區可以放到不同機器,讀寫操作也是以分區粒度。
  • 分區策略?

  • 輪詢
  • 隨機
  • 按 key 保序,單分區有序
  • 7.12 MQ 訊息堆積問題處理

    訊息堆積可能的原因: 佇列中訊息不能被及時的消費,導致大量堆積在佇列裏面 RocketMQ Kafka RabbitMq 都會有這樣的問題 產生訊息堆積的可以從 mq 的生產消費模型去考慮,從生產者到訊息中介軟體、再到消費者,都會發生堆積 消費者 :消費者處理速度過慢,或者消費者故障、延遲,無法即使的處理訊息,導致訊息堆積 生產者 :生產者產生速度過快,消費者無法即使處理 MQ 訊息佇列 :Mq 伺服器的效能不足,比如它所在的機器,cpu、記憶體、磁盤等超載,無法即使的處理訊息,導致訊息堆積 其他 :其他方面也會有這樣的問題, 比如網絡故障,連線問題,訊息在傳遞過程中過慢,從而導致訊息堆積 業務方面,訊息消費失敗重試,不斷的重試,沒有設定重試次數,導致訊息堆積!

    處理訊息堆積問題:

    一、消費者:

    1. 增加消費者的數量,提高消費的處理速度;(註意這個不通用,只適合 RabbitMq) 需要註意不能一味的水平擴充套件消費者 因為其他關鍵鏈路效能是否抗的住大量的水平擴充套件,比如 mysq、redis,詳細見下方 RabbitMQ 訊息堆積解決方案
    2. 或者提高消費者的處理能力,比如透過並行處理、異步處理提高消費者吞吐量。這個則要註意透過執行緒池、佇列,把 mq 拉到程式的佇列中,要承擔對應的宕機導致訊息遺失風險。

    二、MQ 訊息佇列: 增加 MQ 的伺服器資源,cpu、記憶體、磁盤,提高 mq 處理能力 也可以透過分區佇列將訊息分散到多個佇列中,提高整體的處理能力。 (這個則是 Kafka、Rocket 采用的)

    控制佇列容量,避免堆積過多,設定持久化策略。 RabbitMQ 的 懶載入佇列 ,兼顧了持久化和堆積上限。

    三、監控告警(重要) 設定監控系統,比如普羅米修斯,監控訊息數量,消費者處理速度,佇列狀態等等,在堆積發生前,即使的告警,及時采取措施。

    But 上面的策略是通用的一些解決方案,不同的 MQ,生產消費模型是不一樣的,導致需要針對不同 mq 的訊息堆積解決方案不一樣。

    RabbitMQ、Kafka、RocketMQ 發生訊息堆積,分別該如何去解決?

    這裏先點一下, 增加消費者數量,並不是通用的,只適合 RabbitMQ。

    08

    總結

    Kafka 與 Pulsar 都是騰訊雲主打的訊息佇列中介軟體,都具有高效能,高可靠,支持多種場景。Kafka 推出的時間較早,各種場景比如日誌、大數據處理等都有較成熟的解決方案。而 Pulsar 作為一個新秀,支持的功能比 CKafka 更豐富,而且跨地域容災,多租戶等功能,解決了很多 Kafka 設計缺陷和運維成本問題,整體穩定性更強。很多國內外大公司也有很多 Pulsar 的實踐案例。因此,一些傳統的日誌、大數據處理等場景,對高吞吐量有要求的,對訊息可靠性的要求沒那麽高的,可以選用 Kafka,有很多優秀的文件說明怎麽參數調優提高效能。而一些對訊息可靠性、容災要求更好,或者有高分區、延遲佇列等需求的場景,可以選用 Pulsar。

    我們後台的技術棧是基於 Golang 的,在上文的對比中,還挑了一個基於 Golang 開發的訊息佇列 NSQ,如果有一些客製化需求或者需要二次開發的,可以選用 NSQ。也可以透過閱讀 NSQ 的源碼,學習一些優秀高效能訊息佇列中介軟體的實作方式,比如裏邊 diskqueue 元件,一個基於磁盤的訊息佇列,在某些場景下可能也可以進行二次利用。