一、背景
由于要安裝分詞器插件,所以需要重啟ElasticSearch集群以使得新安裝的插件生效
但是在重啟集群的過程中,服務(wù)端代碼卻出現(xiàn)了大量錯誤,如下所示
java.net.ConnectException: Connection refused ?? at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java: 823 ) ?? at org.elasticsearch.client.RestClient.performRequest(RestClient.java: 248 ) ?? at org.elasticsearch.client.RestClient.performRequest(RestClient.java: 235 ) ?? ...... |
基于此,也引出一個潛在的可用性問題,即代碼沒有做到可以平穩(wěn)支持ElasticSearch集群的重啟,不局限于可預(yù)知的升級導(dǎo)致的重啟,包括集群自身不穩(wěn)定導(dǎo)致的集群節(jié)點(diǎn)變化如某節(jié)點(diǎn)的CPU和內(nèi)容過高、所在機(jī)柜或機(jī)房故障等不可預(yù)期情況
二、引入Sniffer
為了提高服務(wù)的可用性和穩(wěn)定性,引入Sniffer(嗅探器)
Sniffer
允許從正在運(yùn)行的 Elasticsearch 集群中自動發(fā)現(xiàn)節(jié)點(diǎn)并將它們設(shè)置為現(xiàn)有 RestClient 實(shí)例的最小庫,默認(rèn)使用節(jié)點(diǎn)信息API檢索所屬集群的交節(jié)點(diǎn),并使用jackson解析得到j(luò)son數(shù)據(jù),與ElasticSearch 2.X及之后版本兼容。
添加Maven依賴以引入Sniffer,如下所示
< dependency > ???? < groupId >org.elasticsearch.client</ groupId > ???? < artifactId >elasticsearch-rest-client-sniffer</ artifactId > ???? < version >7.6.2</ version > </ dependency > |
Sniffer相關(guān)的JavaDoc可參看:elasticsearch-rest-client-sniffer,代碼如下所示
// 失敗嗅探監(jiān)聽器,可保證在RestClient出現(xiàn)失敗時,立即更新集群的節(jié)點(diǎn) SniffOnFailureListener sniffOnFailureListener =? new ?SniffOnFailureListener();? HttpHost httpHost = port !=? null ??? new ?HttpHost(host, port, scheme) : HttpHost.create(host); RestClient client = RestClient.builder(httpHost) ???????? .setFailureListener(sniffOnFailureListener) ???????? .build(); Sniffer sniffer = Sniffer.builder(client) ???????? // 3 minutes, default 5 minutes,定時更新集群的節(jié)點(diǎn) ???????? .setSniffIntervalMillis( 3 ?*? 60 ?*? 1000 )? ???????? // 30 seconds, default 1 minutes ,在失敗立即更新集群的節(jié)點(diǎn)后,額外再次執(zhí)行一次主動的更新行為 ???????? // 因?yàn)檫@個時間段內(nèi)之前出問題的節(jié)點(diǎn)可能已經(jīng)恢復(fù),進(jìn)而可以被再次更新到節(jié)點(diǎn)中被使用) ???????? .setSniffAfterFailureDelayMillis( 30 ?*? 1000 )? ???????? .build(); sniffOnFailureListener.setSniffer(sniffer); |
需要注意的是,需要保證Sniffer和RestClient要具有相同的生命周期,并且應(yīng)該先于RestClient關(guān)閉,如下所示
sniffer.close(); client.close(); |
也可通過以下方式更改是通過HTTP還是HTTPS,并且可以設(shè)置請求超時時間,避免因?yàn)槔」?jié)點(diǎn)時間過長,影響服務(wù)恢復(fù)的速度或恢復(fù)失敗,盡管設(shè)置超時時間后可能獲得的節(jié)點(diǎn)數(shù)少于集群中的節(jié)點(diǎn)數(shù)
NodesSniffer nodesSniffer =? new ?ElasticsearchNodesSniffer( ???????? restClient, ???????? TimeUnit.SECONDS.toMillis( 5 ),???????????? // 默認(rèn)ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT ???????? ElasticsearchNodesSniffer.Scheme.HTTPS);? // 若是HTTP協(xié)議,則ElasticsearchNodesSniffer.Scheme.HTTP Sniffer sniffer = Sniffer.builder(client) ???????? .setNodesSniffer(nodesSniffer).build(); |
也可以自定義獲取節(jié)點(diǎn)的方法,進(jìn)而可以獲取其他ElasticSearch數(shù)據(jù)源或做一些其他的擴(kuò)展,如下所示
NodesSniffer nodesSniffer =? new ?NodesSniffer() { ???? @Override ???? public ?List<Node> sniff()? throws ?IOException { ???????? // 自行按需實(shí)現(xiàn)獲取節(jié)點(diǎn)的邏輯 ???????? return ?null ;? ???? } }; Sniffer sniffer = Sniffer.builder(client) ???????? .setNodesSniffer(nodesSniffer).build(); |