軟件班級網(wǎng)站建設今日騰訊新聞最新消息
大家好,我是易安!今天我們談一談消息隊列中的事務消息這個話題。
一說起事務,你可能自然會聯(lián)想到數(shù)據(jù)庫。我們?nèi)粘J褂檬聞盏膱鼍?#xff0c;絕大部分都是在操作數(shù)據(jù)庫的時候。像MySQL、Oracle這些主流的關系型數(shù)據(jù)庫,也都提供了完整的事務實現(xiàn)。那消息隊列為什么也需要事務呢?
其實很多場景下,我們“發(fā)消息”這個過程,目的往往是通知另外一個系統(tǒng)或者模塊去更新數(shù)據(jù), 消息隊列中的“事務”,主要解決的是消息生產(chǎn)者和消息消費者的數(shù)據(jù)一致性問題。
拿電商產(chǎn)品來舉個例子。一般來說,用戶在電商APP上購物時,先把商品加到購物車里,然后幾件商品一起下單,最后支付,完成購物流程,就可以愉快地等待收貨了。
這個過程中有一個需要用到消息隊列的步驟,訂單系統(tǒng)創(chuàng)建訂單后,發(fā)消息給購物車系統(tǒng),將已下單的商品從購物車中刪除。因為從購物車刪除已下單商品這個步驟,并不是用戶下單支付這個主要流程中必需的步驟,使用消息隊列來異步清理購物車是更加合理的設計。

對于訂單系統(tǒng)來說,它創(chuàng)建訂單的過程中實際上執(zhí)行了2個步驟的操作:
-
在訂單庫中插入一條訂單數(shù)據(jù),創(chuàng)建訂單; -
發(fā)消息給消息隊列,消息的內(nèi)容就是剛剛創(chuàng)建的訂單。
購物車系統(tǒng)訂閱相應的主題,接收訂單創(chuàng)建的消息,然后清理購物車,在購物車中刪除訂單中的商品。
在分布式系統(tǒng)中,上面提到的這些步驟,任何一個步驟都有可能失敗,如果不做任何處理,那就有可能出現(xiàn)訂單數(shù)據(jù)與購物車數(shù)據(jù)不一致的情況,比如說:
-
創(chuàng)建了訂單,沒有清理購物車; -
訂單沒創(chuàng)建成功,購物車里面的商品卻被清掉了。
那我們需要解決的問題可以總結為:在上述任意步驟都有可能失敗的情況下,還要保證訂單庫和購物車庫這兩個庫的數(shù)據(jù)一致性。
對于購物車系統(tǒng)收到訂單創(chuàng)建成功消息清理購物車這個操作來說,失敗的處理比較簡單,只要成功執(zhí)行購物車清理后再提交消費確認即可,如果失敗,由于沒有提交消費確認,消息隊列會自動重試。
問題的關鍵點集中在訂單系統(tǒng),創(chuàng)建訂單和發(fā)送消息這兩個步驟要么都操作成功,要么都操作失敗,不允許一個成功而另一個失敗的情況出現(xiàn)。
這就是事務需要解決的問題。
什么是分布式事務?
那什么是事務呢?如果我們需要對若干數(shù)據(jù)進行更新操作,為了保證這些數(shù)據(jù)的完整性和一致性,我們希望這些更新操作 要么都成功,要么都失敗。 至于更新的數(shù)據(jù),不只局限于數(shù)據(jù)庫中的數(shù)據(jù),可以是磁盤上的一個文件,也可以是遠端的一個服務,或者以其他形式存儲的數(shù)據(jù)。
這就是通常我們理解的事務。其實這段對事務的描述不是太準確也不完整,但是,它更易于理解,大體上也是正確的。所以我還是傾向于這樣來講“事務”這個比較抽象的概念。
一個嚴格意義的事務實現(xiàn),應該具有4個屬性:原子性、一致性、隔離性、持久性。這四個屬性通常稱為ACID特性。
-
原子性,是指一個事務操作不可分割,要么成功,要么失敗,不能有一半成功一半失敗的情況。
-
一致性,是指這些數(shù)據(jù)在事務執(zhí)行完成這個時間點之前,讀到的一定是更新前的數(shù)據(jù),之后讀到的一定是更新后的數(shù)據(jù),不應該存在一個時刻,讓用戶讀到更新過程中的數(shù)據(jù)。
-
隔離性,是指一個事務的執(zhí)行不能被其他事務干擾。即一個事務內(nèi)部的操作及使用的數(shù)據(jù)對正在進行的其他事務是隔離的,并發(fā)執(zhí)行的各個事務之間不能互相干擾,這個有點兒像我們打網(wǎng)游中的副本,我們在副本中打的怪和掉的裝備,與其他副本沒有任何關聯(lián)也不會互相影響。
-
持久性,是指一個事務一旦完成提交,后續(xù)的其他操作和故障都不會對事務的結果產(chǎn)生任何影響。
大部分傳統(tǒng)的單體關系型數(shù)據(jù)庫都完整的實現(xiàn)了ACID,但是,對于分布式系統(tǒng)來說,嚴格的實現(xiàn)ACID這四個特性幾乎是不可能的,或者說實現(xiàn)的代價太大,大到我們無法接受。
分布式事務就是要在分布式系統(tǒng)中的實現(xiàn)事務。在分布式系統(tǒng)中,在保證可用性和不嚴重犧牲性能的前提下,光是要實現(xiàn)數(shù)據(jù)的一致性就已經(jīng)非常困難了,所以出現(xiàn)了很多變種版的一致性,比如順序一致性、最終一致性等等。
顯然實現(xiàn)嚴格的分布式事務是更加不可能完成的任務。所以,目前大家所說的分布式事務,更多情況下,是在分布式系統(tǒng)中事務的不完整實現(xiàn)。在不同的應用場景中,有不同的實現(xiàn),目的都是通過一些妥協(xié)來解決實際問題。
在實際應用中,比較常見的分布式事務實現(xiàn)有2PC(Two-phase Commit,也叫二階段提交)、TCC(Try-Confirm-Cancel)和事務消息。每一種實現(xiàn)都有其特定的使用場景,也有各自的問題,都不是完美的解決方案。
這里我就重點講述下2PC的方式
兩階段提交方式解決分布式事務
兩階段提交協(xié)議為了保證分布在不同節(jié)點上的分布式事務的一致性,我們需要引入一個協(xié)調(diào)者來管理所有的節(jié)點,負責各個本地資源的提交和回滾,并確保這些節(jié)點正確提交操作結果,若提交失敗則放棄事務。
XA 協(xié)議
XA 是一個分布式事務協(xié)議,規(guī)定了事務管理器和資源管理器接口。因此,XA 協(xié)議可以分為兩部分,即事務管理器和本地資源管理器。
-
事務管理器作為 協(xié)調(diào)者,負責各個本地資源的提交和回滾; -
資源管理器就是分布式 事務的參與者.其中資源管理通常是 數(shù)據(jù)庫。
基于 XA 協(xié)議的二階段提交方法中,二階段提交協(xié)議(The two-phase commit protocol,2PC),用于保證分布式系統(tǒng)中事務提交時的數(shù)據(jù)一致性,是 XA 在全局事務中用于協(xié)調(diào)多個資源的機制。
什么是二階段提交
分為投票和提交兩個階段。

投票為第一階段:
-
1 協(xié)調(diào)者(Coordinator,即事務管理器)會向事務的參與者(Cohort,即本地資源管理器)發(fā)起執(zhí)行操作的 CanCommit 請求,并等待參與者的響應. -
2 參與者接收到請求后,會執(zhí)行請求中的事務操作,記錄日志信息(包含事務執(zhí)行前的鏡像),同時鎖定當前記錄。參與者執(zhí)行成功,則向協(xié)調(diào)者發(fā)送“Yes”消息,表示同意操作;若不成功,則發(fā)送“No”消息,表示終止操作。 -
3 當所有的參與者都返回了操作結果(Yes 或 No 消息)后,系統(tǒng)進入了提交階段。
提為第二階段:
協(xié)調(diào)者會根據(jù)所有參與者返回的信息向參與者發(fā)送 DoCommit 或 DoAbort 指令
-
若協(xié)調(diào)者收到的都是“Yes”消息,則向參與者發(fā)送“DoCommit”消息,參與者會完成剩余的操作并釋放資源,然后向協(xié)調(diào)者返回“HaveCommitted”消息; -
如果協(xié)調(diào)者收到的消息中包含“No”消息,則向所有參與者發(fā)送“DoAbort”消息,此時發(fā)送“Yes”的參與者則會根據(jù)之前執(zhí)行操作時的回滾日志對操作進行回滾,然后所有參與者會向協(xié)調(diào)者發(fā)送“HaveCommitted”消息; -
協(xié)調(diào)者接收到“HaveCommitted”消息,就意味著整個事務結束了。
2PC問題
同步阻塞問題:二階段提交算法在執(zhí)行過程中,所有參與節(jié)點都是事務阻塞型的。也就是說,當本地資源管理器占有臨界資源時,其他資源管理器如果要訪問同一臨界資源,會處于阻塞狀態(tài)。
協(xié)調(diào)者單點故障導致參與者長期阻塞問題:基于 XA 的二階段提交算法類似于集中式算法,一旦事務管理器發(fā)生故障,整個系統(tǒng)都處于停滯狀態(tài)。尤其是在提交階段,一旦事務管理器發(fā)生故障,資源管理器會由于等待管理器的消息,而一直鎖定事務資源,導致整個系統(tǒng)被阻塞。
數(shù)據(jù)不一致問題:在提交階段,當協(xié)調(diào)者向參與者發(fā)送 DoCommit 請求之后,如果發(fā)生了局部網(wǎng)絡異常,或者在發(fā)送提交請求的過程中協(xié)調(diào)者發(fā)生了故障,就會導致只有一部分參與者接收到了提交請求并執(zhí)行提交操作,但其他未接到提交請求的那部分參與者則無法執(zhí)行事務提交。于是整個分布式系統(tǒng)便出現(xiàn)了數(shù)據(jù)不一致的問題。
二階段無法解決的問題:協(xié)調(diào)者再發(fā)出DoCommit 消息之后宕機,而唯一接收到這條消息的參與者同時也宕機了。那么即使協(xié)調(diào)者通過選舉協(xié)議產(chǎn)生了新的協(xié)調(diào)者,這條事務的狀態(tài)也是不確定的,沒人知道事務是否被已經(jīng)提交。
3PC
三階段提交協(xié)議(Three-phase commit protocol,3PC),是對二階段提交(2PC)的改進。為了解決兩階段提交的同步阻塞和數(shù)據(jù)不一致問題,三階段提交引入了超時機制和準備階段。
超時機制
同時在協(xié)調(diào)者和參與者中引入超時機制。如果協(xié)調(diào)者或參與者在規(guī)定的時間內(nèi)沒有接收到來自其他節(jié)點的響應,就會根據(jù)當前的狀態(tài)選擇提交或者終止整個事務。
準備階段
在第一階段和第二階段中間引入了一個準備階段,也就是在提交階段之前,加入了一個預提交階段。在預提交階段排除一些不一致的情況,保證在最后提交之前各參與節(jié)點的狀態(tài)是一致的。

CanCommit 階段
協(xié)調(diào)者向參與者發(fā)送請求操作(CanCommit 請求),詢問參與者是否可以執(zhí)行事務提交操作,然后等待參與者的響應;參與者收到 CanCommit 請求之后,回復 Yes,表示可以順利執(zhí)行事務;否則回復 No。(我個人理解類似做TCC中Try操作)

PreCommit 階段
協(xié)調(diào)者根據(jù)參與者的回復情況,來決定是否可以進行 PreCommit 操作 或 中斷事務。
如果所有參與者回復的都是“Yes”,那么協(xié)調(diào)者就會執(zhí)行事務的預執(zhí)行:
-
發(fā)送預提交請求。協(xié)調(diào)者向參與者發(fā)送 PreCommit 請求,進入預提交階段。 -
事務預提交。參與者接收到 PreCommit 請求后執(zhí)行事務操作,并將 Undo 和 Redo 信息記錄到事務日志中,同時鎖定當前記錄。 -
響應反饋。如果參與者成功執(zhí)行了事務操作,則返回 ACK 響應,同時開始等待最終指令
如果任何一個參與者向協(xié)調(diào)者發(fā)送了“No”消息,或者等待超時之后,協(xié)調(diào)者都沒有收到參與者的響應,就執(zhí)行中斷事務的操作:
-
發(fā)送中斷請求。協(xié)調(diào)者向所有參與者發(fā)送“Abort”消息。 -
中斷事務。參與者收到“Abort”消息之后,或超時后仍未收到協(xié)調(diào)者的消息,執(zhí)行事務的中斷操作。

DoCommit 階段
協(xié)調(diào)者根據(jù)參與者的回復情況,來決定是否可以進行 DoCommit 操作 或 中斷事務。
如果所有參與者回復的都是“Yes”,那么協(xié)調(diào)者就會執(zhí)行事務的提交:
-
發(fā)送提交請求。協(xié)調(diào)者接收到所有參與者發(fā)送的 Ack 響應,從預提交狀態(tài)進入到提交狀態(tài),并向所有參與者發(fā)送 DoCommit 消息。 -
事務提交。參與者接收到 DoCommit 消息之后,正式提交事務。完成事務提交之后,釋放所有鎖住的資源。 -
響應反饋。參與者提交完事務之后,向協(xié)調(diào)者發(fā)送 Ack 響應。 -
完成事務。協(xié)調(diào)者接收到所有參與者的 Ack 響應之后,完成事務。
如果任何一個參與者向協(xié)調(diào)者發(fā)送了“No”消息,或者協(xié)調(diào)者等待超時之后,協(xié)調(diào)者都沒有收到參與者的響應,就執(zhí)行中斷事務的操作:
-
發(fā)送中斷請求。協(xié)調(diào)者向所有參與者發(fā)送 Abort 請求。 -
事務回滾。參與者接收到 Abort 消息之后,利用其在 PreCommit 階段記錄的 Undo 信息執(zhí)行事務的回滾操作,并釋放所有鎖住的資源。 -
反饋結果。參與者完成事務回滾之后,向協(xié)調(diào)者發(fā)送 Ack 消息。 -
中斷事務。協(xié)調(diào)者接收到參與者反饋的 Ack 消息之后,執(zhí)行事務的中斷,并結束事務。 。
當參與者PreCommit 階段向協(xié)調(diào)者發(fā)送 Ack 消息后,如果長時間沒有得到協(xié)調(diào)者的響應,在默認情況下,參與者會自動將超時的事務進行提交,不會像兩階段提交那樣被阻塞住

-
如何解決協(xié)調(diào)者單點故障導致參與者長期阻塞。
由于存在超時機制,即使協(xié)調(diào)者發(fā)生故障,參與者無法及時收到來自協(xié)調(diào)者的信息之后,他會默認執(zhí)行commit。避免參與者長期阻塞。
-
同步阻塞問題
3PC會在2階段到3階段間阻塞,2PC會在1階段到2階段整個事務過程中阻塞,因而總體來說3PC并不能不阻塞,只是最大限度減少了阻塞的時間。同時安裝5.2也能夠解決協(xié)調(diào)者單點故障導致參與者長期阻塞的問題
-
數(shù)據(jù)不一致問題
3PC和2PC都無法解決數(shù)據(jù)一致的問題,不過3PC存在超時會通過超時保證協(xié)調(diào)者和參與者在提交階段無法通信過程中最終一致,而不需人工介入。
可以看到不管是2階段提交還是3階段提交都是有些問題的,當然我們還有消息隊列中的事務消息這種思路。事務消息適用的場景主要是那些需要異步更新數(shù)據(jù),并且對數(shù)據(jù)實時性要求不太高的場景。比如我們在開始時提到的那個例子,在創(chuàng)建訂單后,如果出現(xiàn)短暫的幾秒,購物車里的商品沒有被及時清空,也不是完全不可接受的,只要最終購物車的數(shù)據(jù)和訂單數(shù)據(jù)保持一致就可以了。
消息隊列如何實現(xiàn)分布式事務?
事務消息需要消息隊列提供相應的功能才能實現(xiàn),Kafka和RocketMQ都提供了事務相關功能。
回到訂單和購物車這個例子,我們一起來看下如何用消息隊列來實現(xiàn)分布式事務,這里以RocketMQ來舉例。

首先,訂單系統(tǒng)在消息隊列上開啟一個事務。然后訂單系統(tǒng)給消息服務器發(fā)送一個“半消息”,這個半消息不是說消息內(nèi)容不完整,它包含的內(nèi)容就是完整的消息內(nèi)容,半消息和普通消息的唯一區(qū)別是,在事務提交之前,對于消費者來說,這個消息是不可見的。
半消息發(fā)送成功后,訂單系統(tǒng)就可以執(zhí)行本地事務了,在訂單庫中創(chuàng)建一條訂單記錄,并提交訂單庫的數(shù)據(jù)庫事務。然后根據(jù)本地事務的執(zhí)行結果決定提交或者回滾事務消息。如果訂單創(chuàng)建成功,那就提交事務消息,購物車系統(tǒng)就可以消費到這條消息繼續(xù)后續(xù)的流程。如果訂單創(chuàng)建失敗,那就回滾事務消息,購物車系統(tǒng)就不會收到這條消息。這樣就基本實現(xiàn)了“要么都成功,要么都失敗”的一致性要求。
如果你足夠細心,可能已經(jīng)發(fā)現(xiàn)了,這個實現(xiàn)過程中,有一個問題是沒有解決的。如果在第四步提交事務消息時失敗了怎么辦?對于這個問題,Kafka和RocketMQ給出了2種不同的解決方案。
Kafka的解決方案比較簡單粗暴,直接拋出異常,讓用戶自行處理。我們可以在業(yè)務代碼中反復重試提交,直到提交成功,或者刪除之前創(chuàng)建的訂單進行補償。RocketMQ則給出了另外一種解決方案。
RocketMQ中的分布式事務實現(xiàn)
在RocketMQ中的事務實現(xiàn)中,增加了事務反查的機制來解決事務消息提交失敗的問題。如果Producer也就是訂單系統(tǒng),在提交或者回滾事務消息時發(fā)生網(wǎng)絡異常,RocketMQ的Broker沒有收到提交或者回滾的請求,Broker會定期去Producer上反查這個事務對應的本地事務的狀態(tài),然后根據(jù)反查結果決定提交或者回滾這個事務。
為了支撐這個事務反查機制,我們的業(yè)務代碼需要實現(xiàn)一個反查本地事務狀態(tài)的接口,告知RocketMQ本地事務是成功還是失敗。
在我們這個例子中,反查本地事務的邏輯也很簡單,我們只要根據(jù)消息中的訂單ID,在訂單庫中查詢這個訂單是否存在即可,如果訂單存在則返回成功,否則返回失敗。RocketMQ會自動根據(jù)事務反查的結果提交或者回滾事務消息。
這個反查本地事務的實現(xiàn),并不依賴消息的發(fā)送方,也就是訂單服務的某個實例節(jié)點上的任何數(shù)據(jù)。這種情況下,即使是發(fā)送事務消息的那個訂單服務節(jié)點宕機了,RocketMQ依然可以通過其他訂單服務的節(jié)點來執(zhí)行反查,確保事務的完整性。
綜合上面講的通用事務消息的實現(xiàn)和RocketMQ的事務反查機制,使用RocketMQ事務消息功能實現(xiàn)分布式事務的流程如下圖:

總結
本文通過一個訂單購物車的例子,學習了事務的ACID四個特性,以及如何使用消息隊列來實現(xiàn)分布式事務。然后我給出了現(xiàn)有的幾種分布式事務的解決方案,包括事務消息,但是這幾種方案都不能解決分布式系統(tǒng)中的所有問題,每一種方案都有局限性和特定的適用場景。
最后,我們一起學習了RocketMQ的事務反查機制,這種機制通過定期反查事務狀態(tài),來補償提交事務消息可能出現(xiàn)的通信失敗。在Kafka的事務功能中,并沒有類似的反查機制,需要用戶自行去解決這個問題。但是,這不代表RocketMQ的事務功能比Kafka更好,只能說在我們這個例子的場景下,更適合使用RocketMQ。Kafka對于事務的定義、實現(xiàn)和適用場景,和RocketMQ有比較大的差異。
本文由 mdnice 多平臺發(fā)布