国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

典型網(wǎng)站開發(fā)的一般流程推廣app是什么工作

典型網(wǎng)站開發(fā)的一般流程,推廣app是什么工作,易語言wordpress發(fā)帖,做鋁板的網(wǎng)站基于KafkaListener注解的kafka監(jiān)聽代碼可以手動(dòng)指定要消費(fèi)的kafka集群,這對于需要訪問多套kafka集群的程序來說,是有效的解決方案。這里需要注意的是,此時(shí)的消費(fèi)者配置信息需使用原生kafka的配置信息格式(如:ConsumerC…

基于@KafkaListener注解的kafka監(jiān)聽代碼可以手動(dòng)指定要消費(fèi)的kafka集群,這對于需要訪問多套kafka集群的程序來說,是有效的解決方案。這里需要注意的是,此時(shí)的消費(fèi)者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),與自動(dòng)裝載KafkaConsumer時(shí)的配置信息格式不同。詳情如下:

依賴項(xiàng)(其實(shí)spring-kafka包含了kafka-clients)

<!-- spring-kafka --> 
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version>
</dependency>

配置文件
配置參數(shù)的格式和含義,參見《spring-kafka的配置使用》

生產(chǎn)代碼

@Component
@Slf4j
public class KafKaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 這里的 ListenableFuture 類是 spring 對 java 原生 Future 的擴(kuò)展增強(qiáng),是一個(gè)泛型接口,用于監(jiān)聽異步方法的回調(diào) 而對于* kafka send 方法返回值而言,這里的泛型所代表的實(shí)際類型就是 SendResult<K, V>,而這里 K,V 的泛型實(shí)際上 被用于* ProducerRecord<K, V> producerRecord,即生產(chǎn)者發(fā)送消息的 key,value 類型*/ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("發(fā)送消息失敗:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> sendResult){// log.info("發(fā)送消息成功:" + sendResult.toString());}});}
}

消費(fèi)者配置類,其中可配置多個(gè)kafka集群,每個(gè)kafka集群生成一個(gè)KafkaListenerContainerFactory實(shí)例

@Data
@Slf4j
@Configuration
public class KafkaConfig {@ResourceEnvironment environment;@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消費(fèi)并發(fā)數(shù)量containerFactory.setBatchListener(true);      // 批量監(jiān)聽消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取時(shí)限return containerFactory;}@Beanpublic Map<String, Object> consumerConfigs() {String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");String groupId          = environment.getProperty("kafka.groupId", "consumer-group");String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");props.put("sasl.jaas.config", jaasConfig);return props;}
}

消費(fèi)代碼 @KafkaListener注解的containerFactory參數(shù)指定了KafkaListenerContainerFactory實(shí)例,也就指定了kafka集群

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {@Autowiredprivate Environment environment;@Autowiredprivate KafkaMsgHandleService msgHandleService;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/*************************      接收消息************************/@Override@KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}")public void onMessage(List<ConsumerRecord<String, String>> records) {try {final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info("收到消息體:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));/// 處理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error("KafkaListener_kafka_consume_error.", e);}}/*************************      處理消息************************/private void processRecord(String msg) {taskExecutor.submit(() -> {if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {log.warn("KafkaListener_turn_off_drop_message.");return;}msgHandleService.handle(msg);});}
}
http://aloenet.com.cn/news/33333.html

相關(guān)文章:

  • 好看網(wǎng)站手機(jī)版批量查詢權(quán)重
  • 網(wǎng)站建設(shè)php帶數(shù)據(jù)庫模板網(wǎng)絡(luò)安全
  • 如何做網(wǎng)站鏡像百度鏈接提交入口
  • 手機(jī)網(wǎng)站有哪些類型成都網(wǎng)絡(luò)推廣
  • 網(wǎng)站建設(shè) 開發(fā)網(wǎng)站代碼百度網(wǎng)盤官網(wǎng)
  • 怎么用家里的電腦做網(wǎng)站服務(wù)器上海seo公司排名
  • 學(xué)校網(wǎng)站建設(shè)的優(yōu)勢和不足成人用品推廣網(wǎng)頁
  • 陽春網(wǎng)站制作寧波網(wǎng)站建設(shè)推廣平臺(tái)
  • 個(gè)人網(wǎng)站 備案 廣告seo搜索引擎優(yōu)化價(jià)格
  • 72搭建網(wǎng)站網(wǎng)頁代引流推廣公司
  • 想開個(gè)網(wǎng)站賣衣服的怎么做常州seo收費(fèi)
  • 有交做拼多多網(wǎng)站的嗎產(chǎn)品推廣平臺(tái)有哪些
  • 自己做網(wǎng)站賣什么海南樂秀同城群軟件下載
  • 做網(wǎng)站公司南京關(guān)鍵詞免費(fèi)網(wǎng)站
  • c 做網(wǎng)站設(shè)計(jì)關(guān)鍵詞排名提高方法
  • pageadmin如何做網(wǎng)站數(shù)據(jù)分析培訓(xùn)班
  • 代還信用卡網(wǎng)站建設(shè)國內(nèi)最新十大新聞
  • .net mvc做網(wǎng)站一鍵優(yōu)化清理
  • 單縣網(wǎng)站定制網(wǎng)絡(luò)營銷策劃的主要特點(diǎn)
  • 國外網(wǎng)站 服務(wù)器青島seo排名公司
  • 網(wǎng)頁即時(shí)聊天seo推廣怎么樣
  • 用動(dòng)物做網(wǎng)站名稱可以投放廣告的網(wǎng)站
  • 沒有備案的網(wǎng)站可以做淘寶客seo網(wǎng)絡(luò)推廣培訓(xùn)班
  • 校園論壇網(wǎng)站怎么做谷歌收錄提交入口
  • 長沙岳麓區(qū)做網(wǎng)站seo內(nèi)容優(yōu)化是什么
  • 怎么做網(wǎng)站首頁全媒體廣告代理
  • 去國外做移動(dòng)支付網(wǎng)站嗎廣告投放網(wǎng)站
  • 四川網(wǎng)站建設(shè)一站式服務(wù)商互聯(lián)網(wǎng)公司網(wǎng)站模板
  • 企業(yè)oa系統(tǒng)免費(fèi)優(yōu)化網(wǎng)站建設(shè)seo
  • 淘寶做個(gè)網(wǎng)站多少錢seo概念