国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁(yè) > news >正文

網(wǎng)站提交百度收錄怎么簡(jiǎn)單制作一個(gè)網(wǎng)頁(yè)

網(wǎng)站提交百度收錄,怎么簡(jiǎn)單制作一個(gè)網(wǎng)頁(yè),平度網(wǎng)站建設(shè),三星網(wǎng)上商城如何學(xué)生認(rèn)證高性能隊(duì)列 Disruptor 在 IM 系統(tǒng)中的實(shí)戰(zhàn) 前三期我們介紹了Disruptor的典型使用場(chǎng)景和相關(guān)高性能原理,本期我介紹一下Disruptor在IM系統(tǒng)用的應(yīng)用實(shí)戰(zhàn),IM系統(tǒng)即社交聊天系統(tǒng),對(duì)實(shí)時(shí)性的要求非常高,非常符合Disruptor的使用場(chǎng)景。 …

高性能隊(duì)列 Disruptor 在 IM 系統(tǒng)中的實(shí)戰(zhàn)

前三期我們介紹了Disruptor的典型使用場(chǎng)景和相關(guān)高性能原理,本期我介紹一下Disruptor在IM系統(tǒng)用的應(yīng)用實(shí)戰(zhàn),IM系統(tǒng)即社交聊天系統(tǒng),對(duì)實(shí)時(shí)性的要求非常高,非常符合Disruptor的使用場(chǎng)景。

本篇文章將結(jié)合實(shí)際代碼,介紹如何在 IM 系統(tǒng)中使用 Disruptor 進(jìn)行高效的消息轉(zhuǎn)發(fā)。

1. Disruptor 在 IM 系統(tǒng)中的作用

在 IM 系統(tǒng)中,用戶 A 發(fā)送消息給 B、C、D 時(shí),需要根據(jù) B、C、D 所在的服務(wù)器節(jié)點(diǎn)進(jìn)行分組,并將消息轉(zhuǎn)發(fā)到對(duì)應(yīng)的節(jié)點(diǎn)上。為了確保高吞吐量和低延遲,我們使用 Disruptor 作為高性能隊(duì)列。

2. 代碼實(shí)現(xiàn)

2.1 初始化 Disruptor

當(dāng)某個(gè)節(jié)點(diǎn) nodeId 還沒(méi)有對(duì)應(yīng)的 RingBuffer 時(shí),我們需要?jiǎng)?chuàng)建一個(gè)新的 Disruptor,并將其存入 ringBufferMap 中。

???private?final?Map<String,?RingBuffer<ClusterPublishEvent>>?ringBufferMap?=?new?ConcurrentHashMap<>();

????public?ClusterQueueService(Server?server)?{
????????this.mServer?=?server;
????}

????public?void?publishMessage(String?nodeId,?String?fromUser,?String?clientId,?String?topic,?byte[]?payload)?{
????????if?(!ringBufferMap.containsKey(nodeId))?{
????????????long?st?=?System.currentTimeMillis();
????????????synchronized?(ringBufferMap){
????????????????if(!ringBufferMap.containsKey(nodeId))?{
????????????????????BlockingWaitStrategy?strategy?=?new?BlockingWaitStrategy();
????????????????????Disruptor<ClusterPublishEvent>?disruptor?=?new?Disruptor<>(
????????????????????????new?ClusterPublishEventFactory(),?1024?*?1024,?DaemonThreadFactory.INSTANCE,
????????????????????????ProducerType.SINGLE,?strategy);
????????????????????disruptor.handleEventsWith(new?ClusterPublishEventHandler(mServer,?nodeId));
????????????????????disruptor.setDefaultExceptionHandler(new?IgnoreExceptionHandler());
????????????????????disruptor.start();
????????????????????ringBufferMap.put(nodeId,?disruptor.getRingBuffer());
????????????????}
????????????}
????????????log.info("publishMessage?create?RingBuffer?cost:{}ms,?ringBufferMap:{},size:{}",?System.currentTimeMillis()?-?st,?ringBufferMap,?ringBufferMap.size());
????????}
????????RingBuffer<ClusterPublishEvent>?ringBuffer?=?ringBufferMap.get(nodeId);
????????long?sequence?=?ringBuffer.next();
????????//?當(dāng)環(huán)形緩沖區(qū)未用完時(shí),?返回的是空對(duì)象,否則,返回的是緩存的數(shù)據(jù)。
????????ClusterPublishEvent?clusterEvent?=?ringBuffer.get(sequence);
????????clusterEvent.setFromUser(fromUser);
????????clusterEvent.setClientId(clientId);
????????//?此topic,是節(jié)點(diǎn)轉(zhuǎn)發(fā)的topic:?NM2R,?NTF,DESTROYUSER,?只有這三種
????????clusterEvent.setTopic(topic);
????????clusterEvent.setPayload(payload);
????????clusterEvent.setTraceId(MDC.get(ImSvcConstants.TRACE_ID));
????????//?發(fā)布事件,?會(huì)觸發(fā)ClusterPublishEventHandler.onEvent方法
????????ringBuffer.publish(sequence);
????}

關(guān)鍵點(diǎn)解析:

  • 采用 BlockingWaitStrategy 作為等待策略,確保高效的 CPU 資源利用。
  • 采用 DaemonThreadFactory.INSTANCE 創(chuàng)建線程池,避免應(yīng)用程序退出時(shí)線程未正?;厥?。
  • handleEventsWith 設(shè)定事件處理器 ClusterPublishEventHandler,用于消息處理。
  • setDefaultExceptionHandler 避免異常影響消息處理流程。

2.2 按照節(jié)點(diǎn)轉(zhuǎn)發(fā)消息

根據(jù)用戶所在的服務(wù)節(jié)點(diǎn),進(jìn)行消息轉(zhuǎn)發(fā)(發(fā)送消息事件到Disruptor)

????public?void?publish2Receivers(Long?messageId,?Set<String>?receivers,?String?exceptClientId,?int?pullType,?String?topic)?{
????????//未綁定broker的用戶默認(rèn)由本中心處理
????????Map<String,?String>?allReceiverMap?=?new?HashMap<>();
????????for?(String?receiver?:?receivers)?{
????????????allReceiverMap.put(receiver,?localNodeId);
????????}
????????//從分布式緩存獲取獲取用戶路由
????????Map<String,?String>?receiverMap?=?userRouteStore.getAll(receivers);
????????allReceiverMap.putAll(receiverMap);
????????Map<String,?Set<String>>?nodeMap?=?new?HashMap<>();
????????//使用nodeId分組
????????allReceiverMap.forEach((receiver,?nodeId)?->?{
????????????if?(!nodeMap.containsKey(nodeId))?{
????????????????nodeMap.put(nodeId,?new?HashSet<>());
????????????}
????????????nodeMap.get(nodeId).add(receiver);
????????});
????????//獲取可用節(jié)點(diǎn)
????????Cluster?cluster?=?mServer.getHazelcastInstance().getCluster();
????????Set<Member>?members?=?cluster.getMembers();
????????List<String>?collect?=?members.stream().map(member?->?member.getStringAttribute(HZ_Cluster_Node_ID)).collect(Collectors.toList());
????????log.info("hazelcast?node?list:{}",JSON.toJSONString(collect));
????????Map<String,?Member>?memberMap?=?members.stream().collect(Collectors.toMap(
????????????member?->?member.getStringAttribute(HZ_Cluster_Node_ID),?member?->?member,?(k1,?k2?)->k1));
????????//按照節(jié)點(diǎn)分發(fā)
????????nodeMap.forEach((nodeId,?set)?->?{
????????????//?轉(zhuǎn)發(fā)到其他節(jié)點(diǎn)發(fā)送
????????????if?(!nodeId.equals(localNodeId)?&&?memberMap.containsKey(nodeId))?{
????????????????WFCMessage.NotifyMessage2Receivers?notifyMessage2Receivers?=?WFCMessage.NotifyMessage2Receivers.newBuilder()
????????????????????.setMessageId(messageId)
????????????????????.addAllReceivers(set)
????????????????????.setExceptClientId(exceptClientId==null?"":exceptClientId)
????????????????????.setPullType(pullType)
????????????????????.setTopic(topic)
????????????????????.build();
????????????????clusterQueueService.publishMessage(nodeId,nodeId,null,?IMTopic.NotifyMessage2ReceiversTopic,?notifyMessage2Receivers.toByteArray());
????????????}
????????????//?當(dāng)前節(jié)點(diǎn)處理發(fā)送
????????????else?{
????????????????WFCMessage.Message?message?=?mServer.getStore().messagesStore().getMessage(messageId);

????????????????if?(message?!=?null)?{
????????????????????//?Add?By?Youqibin?16:11?2022/3/15?接收通知前置處理
????????????????????preHandle(message,?set);
????????????????????mServer.getImBusinessScheduler().execute(()?->messagesPublisher.publish2Receivers(message,?set,?exceptClientId,?pullType,?localNodeId));
????????????????????//?Add?By?Youqibin?16:11?2022/3/15?接收通知后置處理
????????????????????postHandle(message,?set);
????????????????}
????????????}
????????});
????}

關(guān)鍵點(diǎn)解析:

  • clusterQueueService.publishMessage, 使用Disruptor發(fā)送消息事件,高性能異步處理

2.3 事件處理器 onEvent

當(dāng) Disruptor 事件發(fā)布后,ClusterPublishEventHandler.onEvent 負(fù)責(zé)實(shí)際的消息轉(zhuǎn)發(fā)邏輯。

public?class?ClusterPublishEventHandler?implements?EventHandler<ClusterPublishEvent>?{
????private?final?Server?server;
????private?final?String?nodeId;

????public?ClusterPublishEventHandler(Server?server,?String?nodeId)?{
????????this.server?=?server;
????????this.nodeId?=?nodeId;
????}

????@Override
????public?void?onEvent(ClusterPublishEvent?event,?long?sequence,?boolean?endOfBatch)?{
????????log.info("Processing?event:?{}?on?node:?{}",?event,?nodeId);
????????server.forwardMessage(nodeId,?event.getFromUser(),?event.getClientId(),?event.getTopic(),?event.getPayload());
????}
}

關(guān)鍵點(diǎn)解析:

  • onEvent 方法接收到 ClusterPublishEvent 后,調(diào)用 server.forwardMessage 進(jìn)行消息轉(zhuǎn)發(fā)。
  • endOfBatch 用于標(biāo)識(shí)當(dāng)前事件是否為批處理中的最后一個(gè)事件。
  • log.info 記錄消息處理的關(guān)鍵日志,便于后續(xù)排查。

3. 總結(jié)

本文介紹了 Disruptor 在 IM 系統(tǒng)中的應(yīng)用,核心邏輯包括:

  1. 初始化 Disruptor:為每個(gè) nodeId 創(chuàng)建獨(dú)立的 RingBuffer。
  2. 按照節(jié)點(diǎn)轉(zhuǎn)發(fā)消息:將用戶消息存入對(duì)應(yīng)節(jié)點(diǎn)的 RingBuffer。
  3. 消息處理onEvent 方法從 RingBuffer 讀取消息,并執(zhí)行轉(zhuǎn)發(fā)。

通過(guò) Disruptor,可以大幅降低鎖競(jìng)爭(zhēng),提高 IM 系統(tǒng)的吞吐量,使其能夠在高并發(fā)環(huán)境下穩(wěn)定運(yùn)行。

4. 最后

歡迎關(guān)注加瓦點(diǎn)燈,每天推送干貨知識(shí),一起進(jìn)步!

本文由 mdnice 多平臺(tái)發(fā)布

http://aloenet.com.cn/news/36514.html

相關(guān)文章:

  • 學(xué)校網(wǎng)站的建設(shè)目標(biāo)是什么今天的熱搜榜
  • 網(wǎng)站開(kāi)發(fā)多少錢一個(gè)網(wǎng)站推廣優(yōu)化價(jià)格
  • 源碼網(wǎng)站程序指數(shù)函數(shù)求導(dǎo)公式
  • 濟(jì)南品牌營(yíng)銷型網(wǎng)站建設(shè)品牌策劃運(yùn)營(yíng)公司
  • 怎么編網(wǎng)站中央廣播電視總臺(tái)
  • 林州網(wǎng)站建設(shè)慈溪seo
  • 空調(diào)設(shè)備公司網(wǎng)站建設(shè)上海城市分站seo
  • 文化傳媒公司網(wǎng)站建設(shè)seo排名點(diǎn)擊工具
  • 如何寫(xiě)好網(wǎng)站文案站長(zhǎng)之家官網(wǎng)入口
  • 龍華做棋牌網(wǎng)站建設(shè)多少錢廣告聯(lián)盟接廣告
  • wordpress文章點(diǎn)贊旺道seo優(yōu)化軟件怎么用
  • 獨(dú)立站shopify需要費(fèi)用嗎查關(guān)鍵詞排名工具app
  • 做門戶網(wǎng)站需要具備什么yw77731域名查詢
  • 深廣縱橫設(shè)計(jì)公司官網(wǎng)北京seo顧問(wèn)服務(wù)
  • 公司備案查詢網(wǎng)站備案成都網(wǎng)站快速優(yōu)化排名
  • 濰坊網(wǎng)站建設(shè)最新報(bào)價(jià)steam交易鏈接在哪看
  • 問(wèn)答網(wǎng)站怎么做營(yíng)銷網(wǎng)絡(luò)營(yíng)銷與直播電商專業(yè)介紹
  • 做外貿(mào)電商網(wǎng)站有哪個(gè)b站推廣網(wǎng)站
  • dede裝修網(wǎng)站模板申請(qǐng)網(wǎng)站域名要多少錢
  • 石巖小學(xué)網(wǎng)站建設(shè)品牌推廣策劃方案案例
  • 做免費(fèi)推廣網(wǎng)站seo入門講解
  • 訪問(wèn)國(guó)外網(wǎng)站太慢青島百度網(wǎng)站排名
  • 怎么做網(wǎng)站服務(wù)器嗎營(yíng)銷推廣的特點(diǎn)是
  • 開(kāi)發(fā)公司讓員工頂名買房套取貸款新區(qū)快速seo排名
  • 營(yíng)銷型網(wǎng)站怎么做google引擎入口
  • 做網(wǎng)站 需要買云服務(wù)器嗎營(yíng)銷方法有哪幾種
  • 用vs2012做網(wǎng)站案例樂(lè)天seo培訓(xùn)
  • 水果網(wǎng)站策劃書(shū)優(yōu)化大師免費(fèi)版
  • 教育網(wǎng)站建設(shè)改版百度推廣在線客服
  • 可以做視頻推廣的網(wǎng)站有哪些內(nèi)容搜全網(wǎng)的瀏覽器