典型網(wǎng)站開發(fā)的一般流程推廣app是什么工作
基于@KafkaListener注解的kafka監(jiān)聽代碼可以手動(dòng)指定要消費(fèi)的kafka集群,這對于需要訪問多套kafka集群的程序來說,是有效的解決方案。這里需要注意的是,此時(shí)的消費(fèi)者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),與自動(dòng)裝載KafkaConsumer時(shí)的配置信息格式不同。詳情如下:
依賴項(xiàng)(其實(shí)spring-kafka包含了kafka-clients)
<!-- spring-kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version>
</dependency>
<!-- kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version>
</dependency>
配置文件
配置參數(shù)的格式和含義,參見《spring-kafka的配置使用》
生產(chǎn)代碼
@Component
@Slf4j
public class KafKaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 這里的 ListenableFuture 類是 spring 對 java 原生 Future 的擴(kuò)展增強(qiáng),是一個(gè)泛型接口,用于監(jiān)聽異步方法的回調(diào) 而對于* kafka send 方法返回值而言,這里的泛型所代表的實(shí)際類型就是 SendResult<K, V>,而這里 K,V 的泛型實(shí)際上 被用于* ProducerRecord<K, V> producerRecord,即生產(chǎn)者發(fā)送消息的 key,value 類型*/ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("發(fā)送消息失敗:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> sendResult){// log.info("發(fā)送消息成功:" + sendResult.toString());}});}
}
消費(fèi)者配置類,其中可配置多個(gè)kafka集群,每個(gè)kafka集群生成一個(gè)KafkaListenerContainerFactory實(shí)例
@Data
@Slf4j
@Configuration
public class KafkaConfig {@ResourceEnvironment environment;@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消費(fèi)并發(fā)數(shù)量containerFactory.setBatchListener(true); // 批量監(jiān)聽消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取時(shí)限return containerFactory;}@Beanpublic Map<String, Object> consumerConfigs() {String servers = environment.getProperty("kafka.servers", "127.0.0.1:9092");String groupId = environment.getProperty("kafka.groupId", "consumer-group");String sessionTimeout = environment.getProperty("kafka.session.timeout.ms", "60000");String maxPollRecords = environment.getProperty("kafka.max.poll.records", "100");String maxPollInterval = environment.getProperty("kafka.max.poll.interval", "600000");String jaasConfig = environment.getProperty("kafka.sasl.jaas.config");Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");props.put("sasl.jaas.config", jaasConfig);return props;}
}
消費(fèi)代碼 @KafkaListener注解的containerFactory參數(shù)指定了KafkaListenerContainerFactory實(shí)例,也就指定了kafka集群
@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {@Autowiredprivate Environment environment;@Autowiredprivate KafkaMsgHandleService msgHandleService;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/************************* 接收消息************************/@Override@KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}")public void onMessage(List<ConsumerRecord<String, String>> records) {try {final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info("收到消息體:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));/// 處理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error("KafkaListener_kafka_consume_error.", e);}}/************************* 處理消息************************/private void processRecord(String msg) {taskExecutor.submit(() -> {if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {log.warn("KafkaListener_turn_off_drop_message.");return;}msgHandleService.handle(msg);});}
}