做網(wǎng)站收藏的網(wǎng)頁(yè)搜索熱門關(guān)鍵詞
思維導(dǎo)圖
0. 前言
MySQL 與 Elasticsearch 一致性問(wèn)題是老生常談了。網(wǎng)上有太多關(guān)于這方面的文章了,但是千篇一律,看了跟沒(méi)看沒(méi)有太大區(qū)別。
在生產(chǎn)中,我們往往會(huì)通過(guò) DTS 工具將 binlog 導(dǎo)入到 Kafka,再通過(guò) Kafka 消費(fèi) binlog,組裝數(shù)據(jù)寫(xiě)入 ES。
在這個(gè)過(guò)程中,可能會(huì)存在 binlog 到 Kafka 數(shù)據(jù)丟失,或者應(yīng)用程序消費(fèi) Kafka 數(shù)據(jù)成功,但是數(shù)據(jù)未正確寫(xiě)入至 ES。
本文將探討如何解決 MySQL 與 ES 的數(shù)據(jù)一致性問(wèn)題。
1. 消費(fèi)側(cè) Client 到 ES 數(shù)據(jù)丟失
這里出現(xiàn)丟失的問(wèn)題,可能有以下幾種情況
- ES 寫(xiě)入線程滿了,請(qǐng)求被拒絕。
- ES 寫(xiě)入沖突。
1.1 ES 寫(xiě)入請(qǐng)求被拒絕
一般是通過(guò)遞階式重試來(lái)解決問(wèn)題,例如第一次等待 1s 后寫(xiě)入,第二次還出現(xiàn),則等待 3s 后再嘗試寫(xiě)入。如果最后寫(xiě)入還是失敗,應(yīng)該記錄日志,并告警,而后通過(guò)人工介入的方式補(bǔ)償數(shù)據(jù)。
不過(guò)也需要考慮另外一個(gè)問(wèn)題,為什么并發(fā)這么高?這么高的寫(xiě)入并發(fā),對(duì) ES 壓力是否太大了?
一般而言,我們認(rèn)為 ES 是不適合并發(fā)太高的寫(xiě)入。因此在消費(fèi)側(cè)除了要控制 MQ 并發(fā)消費(fèi)的線程數(shù),也要多用用 同步 Bulk API
做批量更新。
1.2 ES 寫(xiě)入沖突
消費(fèi)側(cè)的邏輯一般如下:
- 會(huì)將有關(guān)聯(lián)記錄打到同一個(gè)隊(duì)列,防止并發(fā)問(wèn)題。例如同一個(gè)商品的 Binlog 都打到同一個(gè)隊(duì)列
- 在主表的
Insert Binlog
中,查詢關(guān)聯(lián)表信息,拼裝完整記錄寫(xiě)入 ES - 其它關(guān)聯(lián)表的更新、寫(xiě)入以及主表的
Update Binlog
都用Update API
做部分更新操作 Delete Binlog
用Delete API
做刪除操作
如果是通過(guò)我上面說(shuō)的方式進(jìn)行寫(xiě)入,會(huì)出現(xiàn)沖突問(wèn)題的僅有 Update API
。
ES Client Update Api
提供了 retry_on_conflict
參數(shù)。該參數(shù)的意思是,如果發(fā)生版本沖突則重試,該參數(shù)默認(rèn)為 0,即默認(rèn)不重試。生產(chǎn)環(huán)境中,我們可以通過(guò)配置中心動(dòng)態(tài)配置該參數(shù)值。如果重試之后還是發(fā)生錯(cuò)誤,建議捕獲版本沖突異常,并告警,然后人工手動(dòng)更新。
2. DTS 工具到 Kafka 數(shù)據(jù)丟失
這里的丟失包含 2 個(gè)部分
- DTS 到 Kafka 丟失
- 數(shù)據(jù)在 Kafka Broker 端丟失
一般是采用定時(shí)增量校驗(yàn)。校驗(yàn) MySQL 和 Elasticsearch 數(shù)據(jù)是否一致性。
整體的流程圖如下所示: