哈爾濱網(wǎng)站開(kāi)發(fā)制作岳陽(yáng)seo快速排名
目錄
一、Nacos 和 Eureka 的區(qū)別
1.1、以 Nacos 注冊(cè)流程來(lái)解析區(qū)別
一、Nacos 和 Eureka 的區(qū)別
1.1、以 Nacos 注冊(cè)流程來(lái)解析區(qū)別
a)首先,我們的服務(wù)啟動(dòng)時(shí)。都會(huì)把自己的信息提交給注冊(cè)中心,然后注冊(cè)中心就會(huì)把信息保存下來(lái).
注冊(cè)的信息實(shí)際上就是一個(gè)嵌套 Map,結(jié)構(gòu)為?Map<String,?Map<String,?Service>>,第一層 key 就是 namespace_id,起到環(huán)境隔離的作用. value 由是一個(gè)嵌套 Map<String, Service>.
第二層的 key 表示 group 分組,key 就是分組名,value 就是分組下的某一個(gè)服務(wù),實(shí)際上就是一個(gè)類,內(nèi)部又維護(hù)了一個(gè)? Map<String,Cluster> .
第三層的 key 就是集群的名稱,value 就是? Cluster ,也是一個(gè)類,包含了集群的具體信息.??
因?yàn)橐粋€(gè)集群中可能包含多個(gè)實(shí)例,也就是具體的節(jié)點(diǎn)信息(例如實(shí)例的IP、Port、健康狀態(tài)),那么 Cluster 這個(gè)類中又維護(hù)了 兩個(gè) Set<Instance>,分別是臨時(shí)實(shí)例和非臨時(shí)實(shí)例(此處,Eureka 就沒(méi)有做區(qū)分,只有臨時(shí)實(shí)例).
b)那么當(dāng)服務(wù)消費(fèi)者要去消費(fèi)時(shí),就可以從注冊(cè)中心拉取服務(wù)信息.? 這個(gè)過(guò)程也被稱為“服務(wù)發(fā)現(xiàn)”.? 但是他這個(gè)拉去動(dòng)作不是每次都要做的(壓力太大),而是將拉取到的服務(wù)信息緩存到一個(gè)列表中,這樣接下來(lái)的一段時(shí)間里,就不用去拉去了,而是直接從緩存列表中拿.?
當(dāng)然這個(gè)緩存一直不更新也不行,因此會(huì)每隔 30 秒取重新拉取一次(多長(zhǎng)時(shí)間不用記,都是可以配置的),進(jìn)行更新.
c)消費(fèi)者拿到服務(wù)列表后,就可以通過(guò) 負(fù)載均衡(LoadBalancer)從列表中挑選一個(gè)發(fā)起遠(yuǎn)程調(diào)用就可以了.?
d)截至目前為止,Nacos 和 Eureka 還沒(méi)什么太大的差別,那差別在哪呢?差別就在于服務(wù)提供者的健康檢測(cè)機(jī)制.
e)在 nacos 中,將服務(wù)分成了臨時(shí)實(shí)例和非臨時(shí)實(shí)例:
- 臨時(shí)實(shí)例:當(dāng)臨時(shí)實(shí)例進(jìn)行心跳檢測(cè)的時(shí)候,如果心跳不跳了,nacos 就會(huì)把它從服務(wù)中直接剔除.? (這里的心跳檢測(cè)機(jī)制和 Eureka 是一樣的,但非臨時(shí)實(shí)例就不一樣了)
- 非臨時(shí)實(shí)例:nacos 就不會(huì)要求你給我發(fā)心跳了,而是通過(guò) nacos 主動(dòng)發(fā)請(qǐng)求詢問(wèn),定時(shí)向?qū)嵗l(fā)送請(qǐng)求:“你還活著嗎?”,即使真的掛了,nacos 也僅僅只是把它標(biāo)記為 不健康,不會(huì)剔除,而是等待它恢復(fù)健康.
而 Eureka 只提供了心跳模式的健康監(jiān)測(cè),而沒(méi)有主動(dòng)檢測(cè)功能。
主動(dòng)詢問(wèn)進(jìn)行健康檢測(cè),效率豈不是很低?
對(duì)于非臨時(shí)實(shí)例,所有的健康檢測(cè)任務(wù)都不是立即執(zhí)行的,都會(huì)被放入一個(gè)阻塞隊(duì)列中,如下源碼
@Override
public void process(HealthCheckTask task) {// 獲取所有 非臨時(shí)實(shí)例的 集合List<Instance> ips = task.getCluster().allIPs(false);if (CollectionUtils.isEmpty(ips)) {return;}for (Instance ip : ips) {// 封裝健康檢測(cè)信息到 BeatBeat beat = new Beat(ip, task);// 放入一個(gè)阻塞隊(duì)列中taskQueue.add(beat);MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();}
}
可以看出,檢測(cè)任務(wù)不是立即執(zhí)行,這里也采用了異步指定的策略,會(huì)把任務(wù)放到線程池中取執(zhí)行,如下:
public void run() {while (true) {try {// 處理任務(wù)processTask();// ...} catch (Throwable e) {SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);}}
}
?通過(guò)?processTask 來(lái)處理健康檢測(cè)的任務(wù):
private void processTask() throws Exception {// 將任務(wù)封裝為一個(gè) TaskProcessor,并放入集合Collection<Callable<Void>> tasks = new LinkedList<>();do {Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);if (beat == null) {return;}tasks.add(new TaskProcessor(beat));} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);// 批量處理集合中的任務(wù)for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {f.get();}
}
?任務(wù)被封裝到了TaskProcessor中去執(zhí)行了,TaskProcessor是一個(gè)Callable,其中的call方法:
@Override
public Void call() {// 獲取檢測(cè)任務(wù)已經(jīng)等待的時(shí)長(zhǎng)long waited = System.currentTimeMillis() - beat.getStartTime();if (waited > MAX_WAIT_TIME_MILLISECONDS) {Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");}SocketChannel channel = null;try {// 獲取實(shí)例信息Instance instance = beat.getIp();// 通過(guò)NIO建立TCP連接channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);Cluster cluster = beat.getTask().getCluster();int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();channel.connect(new InetSocketAddress(instance.getIp(), port));// 注冊(cè)連接、讀取事件SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key));beat.setStartTime(System.currentTimeMillis());GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),"tcp:error:" + e.getMessage());if (channel != null) {try {channel.close();} catch (Exception ignore) {}}}return null;
}
這差別就像是親生兒子和非親生兒子,親生兒子我還會(huì)去主動(dòng)關(guān)懷一下,誒,你還活著么?而非臨時(shí)實(shí)例,就是你不心跳了,就把你扔了~
f)還有一個(gè)差別在于服務(wù)消費(fèi)者,Eureka 采用的是定時(shí)拉取(每 30 秒一次),那如果在 30 秒內(nèi)服務(wù)提供者掛了,消費(fèi)肯定是不知道的,因此 Eureka 這里更新的時(shí)效性也比較差.
我們的 微服務(wù) 定時(shí)拉取的基本邏輯就是先從本地緩存讀:
- 如果本地緩存沒(méi)有,就通過(guò) Nacos 客戶端構(gòu)造請(qǐng)求去 nacos 服務(wù)器中讀取.
- 如果本地緩存有,就開(kāi)啟定時(shí)更新功能(就是創(chuàng)建一個(gè)定時(shí)任務(wù),每隔一段時(shí)間去拉取一次),并返回緩存結(jié)果.
核心源碼如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());// 由 服務(wù)名@@集群名拼接 keyString key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// 讀取本地服務(wù)列表的緩存,緩存是一個(gè)Map,格式:Map<String, ServiceInfo>ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 判斷緩存是否存在if (null == serviceObj) {// 不存在,創(chuàng)建空ServiceInfoserviceObj = new ServiceInfo(serviceName, clusters);// 放入緩存serviceInfoMap.put(serviceObj.getKey(), serviceObj);// 放入待更新的服務(wù)列表(updatingMap)中updatingMap.put(serviceName, new Object());// 立即更新服務(wù)列表updateServiceNow(serviceName, clusters);// 從待更新列表中移除updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {// 緩存中有,但是需要更新if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finish 等待5秒中,待更新完成synchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}// 開(kāi)啟定時(shí)更新服務(wù)列表的功能scheduleUpdateIfAbsent(serviceName, clusters);// 返回緩存中的服務(wù)信息return serviceInfoMap.get(serviceObj.getKey());
}
定時(shí)更新方法如下:
public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {// 基于ServerProxy發(fā)起遠(yuǎn)程調(diào)用,查詢服務(wù)列表String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);if (StringUtils.isNotEmpty(result)) {// 處理查詢結(jié)果processServiceJson(result);}} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}
}public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {// 準(zhǔn)備請(qǐng)求參數(shù)final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));// 發(fā)起請(qǐng)求,地址與API接口一致return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
?而 nacos 這里的消費(fèi)者不光進(jìn)行服務(wù)的定時(shí)拉取,nacos 還會(huì)主動(dòng)進(jìn)行消息的訂閱推送,一旦發(fā)現(xiàn)有服務(wù)掛了,就立刻推送一條消息給服務(wù)消費(fèi)者,告訴你服務(wù)要更新了.
Nacos 具體是通過(guò)什么實(shí)現(xiàn)消息訂閱推送機(jī)制呢?
a)首先 PushPeceiver 這個(gè)類(我們自己的微服務(wù)配置的 Nacos 客戶端),會(huì)以 UDP 的方式與 Nacos 服務(wù)端建立連接,監(jiān)聽(tīng) Nacos 服務(wù)端推送的服務(wù)變更數(shù)據(jù).
b)一旦 Nacos 服務(wù)列表發(fā)生變更,就會(huì)發(fā)送 UDP 廣播給所有的微服務(wù)訂閱者.
c)當(dāng)訂閱者接收到通知以后,就可以將接收到的服務(wù)信息緩存到本地緩存列表.
d)那么之后再拉取服務(wù)的時(shí)候,會(huì)優(yōu)先從緩存里讀取,緩存里有就直接返回緩存,如果沒(méi)有,再去拉取或者訂閱.
PushPeceiver 構(gòu)造函數(shù)如下:
public PushReceiver(HostReactor hostReactor) {try {this.hostReactor = hostReactor;// 創(chuàng)建 UDP客戶端String udpPort = getPushReceiverUdpPort();if (StringUtils.isEmpty(udpPort)) {this.udpSocket = new DatagramSocket();} else {this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));}// 準(zhǔn)備線程池this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.push.receiver");return thread;}});// 開(kāi)啟線程任務(wù),準(zhǔn)備接收變更數(shù)據(jù)this.executorService.execute(this);} catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);}
}
PushReceiver 構(gòu)造函數(shù)中基于線程池來(lái)運(yùn)行任務(wù)。這是因?yàn)?PushReceiver 本身也是一個(gè)Runnable,其中的run方法業(yè)務(wù)邏輯就是:
@Override
public void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);// 接收推送數(shù)據(jù)udpSocket.receive(packet);// 解析為json字符串String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());// 反序列化為對(duì)象PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {// 交給 HostReactor去處理hostReactor.processServiceJson(pushPacket.data);// send ack to server 發(fā)送ACK回執(zhí),略。。} catch (Exception e) {if (closed) {return;}NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}
通知數(shù)據(jù)的處理由交給了?HostReactor?
的?processServiceJson?
方法:
public ServiceInfo processServiceJson(String json) {// 解析出ServiceInfo信息ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 查詢緩存中的 ServiceInfoServiceInfo oldService = serviceInfoMap.get(serviceKey);// 如果緩存存在,則需要校驗(yàn)?zāi)男?shù)據(jù)要更新boolean changed = false;if (oldService != null) {// 拉取的數(shù)據(jù)是否已經(jīng)過(guò)期if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "+ serviceInfo.getLastRefTime());}// 放入緩存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 中間是緩存與新數(shù)據(jù)的對(duì)比,得到newHosts:新增的實(shí)例;remvHosts:待移除的實(shí)例;// modHosts:需要修改的實(shí)例if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {// 發(fā)布實(shí)例變更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));DiskCache.write(serviceInfo, cacheDir);}} else {// 本地緩存不存在changed = true;// 放入緩存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 直接發(fā)布實(shí)例變更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}// 。。。return serviceInfo;
}