做網(wǎng)站是怎樣賺錢的網(wǎng)站快速排名互點軟件
目錄
1、消息生產(chǎn)流程
2、生產(chǎn)者常見參數(shù)配置
3、序列化器
基本概念
自定義序列化器
4、分區(qū)器
默認分區(qū)規(guī)則
自定義分區(qū)器
5、生產(chǎn)者攔截器
作用
自定義攔截器
6、生產(chǎn)者原理解析
1、消息生產(chǎn)流程
2、生產(chǎn)者常見參數(shù)配置
3、序列化器
基本概念
- 在Kafka中保存的數(shù)據(jù)都是字節(jié)數(shù)組。
- 消息發(fā)送前,需要將消息序列化為字節(jié)數(shù)組進行發(fā)送。
- 生產(chǎn)者通過key.serializer和value.serializer指定key和value的序列化器。
- Kafka使用org.apache.kafka.common.serialization.Serializer接口定義序列化器。
- Kafka已實現(xiàn)的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。
自定義序列化器
實現(xiàn)org.apache.kafka.common.serialization.Serializer<T>接口,并實現(xiàn)其中的serializer方法。
@Data
public class User {private Integer userId;private String username;
}public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果數(shù)據(jù)是null,則返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}// 第一個4字節(jié)存儲userId的值// 第二個4字節(jié)存儲username字節(jié)數(shù)組的長度int值// 第三個length長度,存儲username序列化之后的字節(jié)數(shù)組ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化數(shù)據(jù)異常");}}@Overridepublic void close() {// do nothing}
}
4、分區(qū)器
默認分區(qū)規(guī)則
KafkaProducer.partition();DefaultPartitioner.partition();
- 如果record提供了分區(qū)號,則使?record提供的分區(qū)號
- 如果record沒有提供分區(qū)號,則使?key的序列化后的值的hash值對分區(qū)數(shù)量取模
- 如果record沒有提供分區(qū)號,也沒有提供key,則使?輪詢的?式分配分區(qū)號。
自定義分區(qū)器
實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,并實現(xiàn)其中的partition方法。
在生產(chǎn)者參數(shù)中通過配置partitioner.class指定自定義分區(qū)器。
/*** 自定義分區(qū)器*/
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 此處可以計算分區(qū)的數(shù)字。// 我們直接指定為2return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
5、生產(chǎn)者攔截器
作用
????????在發(fā)送消息前,或者在執(zhí)行回調(diào)邏輯前,對消息做一些定制化的處理,比如修改消息,打印消息日志等。此外,Producer允許設(shè)置多個攔截器從而形成一條攔截器鏈,Producer將按照指定順序調(diào)用它們。
自定義攔截器
????????自定義攔截器實現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor接口,并實現(xiàn)其中的onSend()、onAcknowledgement()、close()接口。其中:
- onSend(ProducerRecord):Producer 確保在消息被序列化前調(diào)?該?法。?戶可以在該?法中對消息做任何操作,但最好保證不要修 改消息所屬的topic和分區(qū),否則會影響?標分區(qū)的計算。
- onAcknowledgement(RecordMetadata, Exception):該?法會在消息被應(yīng)答之前或消息發(fā)送失敗時調(diào)?, 并且通常都是在Producer回調(diào)邏輯觸發(fā)之前。
- close:關(guān)閉Interceptor,主要?于執(zhí)??些資源清理?作。
? ? ? ? 在生產(chǎn)者參數(shù)中通過配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定義攔截器。
public class Interceptor<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("攔截器---go");// 此處根據(jù)業(yè)務(wù)需要對相關(guān)的數(shù)據(jù)作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息頭headers.add("interceptor", "interceptor".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY, VALUE>(topic, partition, timestamp, key, value, headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("攔截器---back");if (exception != null) {// 如果發(fā)生異常,記錄在日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
6、生產(chǎn)者原理解析
以上內(nèi)容為個人學習理解,如有問題,歡迎在評論區(qū)指出。
部分內(nèi)容截取自網(wǎng)絡(luò),如有侵權(quán),聯(lián)系作者刪除。