国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

網(wǎng)站如何兼容大多瀏覽器怎么注冊電商平臺

網(wǎng)站如何兼容大多瀏覽器,怎么注冊電商平臺,網(wǎng)站建設汽車后市場,網(wǎng)站右側出現(xiàn)百度名片叫什么Kafka入門4.0.0版本(基于Java、SpringBoot操作) 一、kafka概述 Kafka最初是由LinkedIn公司開發(fā)的,是一個高可靠、高吞吐量、低延遲的分布式發(fā)布訂閱消息系統(tǒng),它使用Scala語言編寫,并于2010年被貢獻給了Apache基金會&…

Kafka入門4.0.0版本(基于Java、SpringBoot操作)

一、kafka概述

Kafka最初是由LinkedIn公司開發(fā)的,是一個高可靠、高吞吐量、低延遲的分布式發(fā)布訂閱消息系統(tǒng),它使用Scala語言編寫,并于2010年被貢獻給了Apache基金會,隨后成為Apache的頂級開源項目。主要特點有:

  1. 為發(fā)布和訂閱提供高吞吐量
  2. 消息持久化
  3. 分布式
  4. 消費消息采用Pull模式
  5. 支持在線和離線場景

本次采用最新的kafka版本4.0.0,Kafka 4.0 最引人矚目的變化之一,當屬其默認運行在 KRaft(Kafka Raft)模式下,徹底擺脫了對 Apache ZooKeeper 的依賴。在 Kafka 的發(fā)展歷程中,ZooKeeper 曾是其核心組件,負責協(xié)調分布式系統(tǒng)中的元數(shù)據(jù)管理、Broker 注冊、主題分區(qū)分配等關鍵任務。然而,隨著 Kafka 功能的不斷豐富與用戶規(guī)模的持續(xù)擴大,ZooKeeper 逐漸成為系統(tǒng)部署和運維中的一個復雜性來源,增加了運營成本與管理難度。

KRaft 模式的引入,標志著 Kafka 在架構上的自我進化達到了一個新高度。通過采用基于 Raft 一致性算法的共識機制,Kafka 將元數(shù)據(jù)管理內(nèi)嵌于自身體系,實現(xiàn)了對 ZooKeeper 的無縫替代。這一轉變帶來了多方面的顯著優(yōu)勢:

簡化部署與運維:運維人員無需再為維護 ZooKeeper 集群投入額外精力,降低了整體運營開銷。新架構減少了系統(tǒng)的復雜性,使得 Kafka 的安裝、配置和日常管理變得更加直觀和高效。

增強可擴展性:KRaft 模式下,Kafka 集群的擴展性得到了進一步提升。新增 Broker 節(jié)點的加入流程更加簡便,能夠更好地適應大規(guī)模數(shù)據(jù)處理場景下對系統(tǒng)資源動態(tài)調整的需求。

提升系統(tǒng)性能與穩(wěn)定性:去除 ZooKeeper 這一外部依賴后,Kafka 在元數(shù)據(jù)操作的響應速度和一致性方面表現(xiàn)出色。尤其是在高并發(fā)寫入和讀取場景下,系統(tǒng)的穩(wěn)定性和可靠性得到了增強,減少了因外部組件故障可能導致的單點問題。

  • 之前的架構

在這里插入圖片描述

  • 現(xiàn)在的架構

在這里插入圖片描述

kafka消費模型

不同消費者組可以消費全量的消息,相同消費者組內(nèi)的消費者只能消費一部分。

在這里插入圖片描述

kafka基本概念

Producer(生產(chǎn)者)

消息的生產(chǎn)者,負責將消息發(fā)送到Kafka集群中。

Consumer(消費者)

消息的消費者,負責從Kafka集群中讀取并處理消息

Broker(服務代理節(jié)點)

Kafka集群中的一個或多個服務器,負責存儲和轉發(fā)消息。

Topic(主題)

Kafka中的消息以主題為單位進行歸類,生產(chǎn)者發(fā)送消息到特定主題,消費者訂閱并消費這些主題的消息。

Partition(分區(qū))

每個主題可以細分為多個分區(qū),分區(qū)是Kafka存儲消息的物理單位,每個分區(qū)可以看作是一個有序的、不可變的消息序列。

Replica(副本)

Kafka為每個分區(qū)引入了多副本機制,以提高數(shù)據(jù)的安全性和可靠性。副本分為leader和follower,其中l(wèi)eader負責處理讀寫請求,follower負責從leader同步數(shù)據(jù)。

Consumer Group(消費者組)

由多個消費者組成,消費者組內(nèi)的消費者共同消費同一個主題的消息,但每個消費者只負責消費該主題的一個或多個分區(qū),避免消息重復消費。

kraft

通過采用基于 Raft 一致性算法的共識機制,Kafka 將元數(shù)據(jù)管理內(nèi)嵌于自身體系,實現(xiàn)了對 ZooKeeper 的無縫替代

kafka發(fā)送端采用push模式

kafka消費端采用pull模式訂閱并消費消息

在這里插入圖片描述

Kafka的工作原理

可以概括為以下幾個步驟:

  • 消息發(fā)布: 生產(chǎn)者將消息發(fā)送到Kafka集群的特定主題,并可以選擇發(fā)送到該主題的哪個分區(qū)。如果未指定分區(qū),Kafka會根據(jù)負載均衡策略自動選擇分區(qū)。

  • 消息存儲: Kafka將接收到的消息存儲在磁盤上的分區(qū)中,每個分區(qū)都是一個有序的消息序列。Kafka使用順序寫入和零拷貝技術來提高寫入性能,并通過多副本機制確保數(shù)據(jù)的安全性和可靠性。

在這里插入圖片描述

  • 消息消費: 消費者組中的消費者從Kafka集群中訂閱并消費消息。每個消費者負責消費一個或多個分區(qū)中的消息,并確保消息至少被消費一次。消費者可以使用拉(Pull)模式或推(Push)模式從Kafka中拉取消息。

在這里插入圖片描述

  • 負載均衡: Kafka通過ZooKeeper維護集群的元數(shù)據(jù)信息,包括分區(qū)和消費者的對應關系。當消費者數(shù)量或分區(qū)數(shù)量發(fā)生變化時,Kafka會重新分配分區(qū)給消費者,以實現(xiàn)負載均衡。

  • 容錯機制: Kafka通過多副本機制實現(xiàn)容錯。當leader副本出現(xiàn)故障時,Kafka會從ISR(In-Sync Replicas)集合中選擇一個新的leader副本繼續(xù)對外提供服務。同時,Kafka還提供了多種可靠性級別供用戶選擇,以滿足不同的業(yè)務需求。

kafka特點

一、Kafka的持久化機制

Kafka的持久化機制主要涉及消息的存儲和復制。Kafka以日志的形式存儲消息,每個主題(Topic)被劃分為多個分區(qū)(Partition),每個分區(qū)中的消息按照順序進行存儲。Kafka使用多個副本(Replica)保證消息的持久性和可靠性,每個分區(qū)的消息會被復制到多個副本中,以防止數(shù)據(jù)丟失。此外,Kafka還允許根據(jù)配置的保留策略來保留已消費的消息一段時間,以便在需要時進行檢索和恢復。

Kafka的副本機制是其實現(xiàn)高可用性和數(shù)據(jù)持久性的重要基石。每個主題的每個分區(qū)都配置有多個副本,這些副本分散保存在不同的Broker上,從而能夠對抗部分Broker宕機帶來的數(shù)據(jù)不可用問題。Kafka的副本機制包括領導者副本(Leader Replica)和追隨者副本(Follower Replica):

領導者副本:負責處理所有的讀寫請求,包括生產(chǎn)者的消息寫入和消費者的消息讀取。

追隨者副本:從領導者副本異步拉取消息,并寫入到自己的提交日志中,從而實現(xiàn)與領導者副本的同步。追隨者副本不對外提供服務,只作為數(shù)據(jù)的冗余備份。

Kafka還引入了ISR(In-Sync Replicas)機制,即與領導者副本保持同步的副本集合。只有處于ISR中的副本才能參與到消息的寫入和讀取過程中,以確保數(shù)據(jù)的一致性和可靠性。當某個副本與領導者副本的同步延遲超過一定的閾值時,它會被踢出ISR,直到同步恢復正常。

二、Kafka的數(shù)據(jù)一致性

Kafka通過多個機制來確保數(shù)據(jù)的一致性,包括副本同步、ISR機制、生產(chǎn)者事務和消費者事務等:

副本同步:確保主副本將數(shù)據(jù)同步到所有副本的過程,在副本同步完成之前,生產(chǎn)者才會認為消息已經(jīng)被成功寫入。

ISR機制:通過動態(tài)調整ISR列表中的副本,確保只有可靠的副本參與到數(shù)據(jù)的讀寫操作,從而提高數(shù)據(jù)的一致性和可靠性。

生產(chǎn)者事務:Kafka的生產(chǎn)者事務機制可以確保消息的Exactly-Once語義,即消息不會被重復寫入或丟失。生產(chǎn)者事務將消息的發(fā)送和位移提交等操作放在同一個事務中,一旦事務提交成功,就意味著消息已經(jīng)被成功寫入,并且對應的位移也已經(jīng)提交。

消費者事務:雖然Kafka的消費者通常不直接支持事務但消費者可以通過提交位移(Offset)來確保消息的正確消費。消費者事務將消息的拉取和位移提交等操作放在同一個事務中,以確保消息不會被重復消費或丟失。

二、kafka應用

2.1 win11安裝kafka4.0.0

下載地址:https://kafka.apache.org/downloads 下載最后一個kafka-2.13-4.0.0.tgz

在這里插入圖片描述

下載好之后,把這個壓縮包解壓就行了,然后找到config下面的server.properties

找到log.dirs改成自己電腦上的目錄

log.dirs=E:\\runSoft\\kafka\\data

在這里插入圖片描述

  • 第一步 獲取uuid

先打開命令行,進入到bin下面的windows目錄下

命令

kafka-storage.bat random-uuid

在這里插入圖片描述

先獲取uuid,我的uuid為ANVnC_s-QYGJF1C7wu9Aww

  • 第二步 格式化日志

命令:

kafka-storage.bat format --standalone -t PPEZ2LW8T8yjZNWnfNHorQ -c ../../config/server.properties

在這里插入圖片描述

  • 第三步 啟動

打開命令行,進入到bin下面的windows目錄下 啟動命令

kafka-server-start.bat ../../config/server.properties
創(chuàng)建topic
kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092
啟動一個消費端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
啟動一個生產(chǎn)端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
問題
1、如果提示如下
命令行  輸入行太長。
命令語法不正確。

則需要把目錄變短,目錄太長,win11不讓輸入。

2,tgz需要解壓兩次
只解壓一次是不行的,tgz是打包之后壓縮的。

3、如果啟動失敗,需要重新配置

重新配置時。把log.dirs的路徑下面的東西清空
2.2 java開發(fā)kafka

第一步,引入依賴

 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version>
</dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.17</version>
</dependency>

第二步,建立生產(chǎn)者

public class Producer {public static void main(String[] args) {Map<String,Object> props = new HashMap<>();//  kafka 集群 節(jié)點props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");String topic = "test";KafkaProducer<String,  String> producer = new KafkaProducer(props);producer.send(new ProducerRecord<String, String>(topic, "key", "value-1"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-2"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-3"));producer.close();}}

ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要發(fā)送的 key/value 鍵值對,它由記錄要發(fā)送到的主題名稱(Topic Name),**可選的分區(qū)號(Partition Number)**以及可選的鍵值對構成。

在這里插入圖片描述

第三步、建立消費者類

public class Consumer {public static void main(String[] args){Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String , String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {ConsumerRecords<String,String> records =  consumer.poll(Duration.ofDays(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("partition = %d ,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());}}}
}

運行效果

在這里插入圖片描述

2.3 spring boot整合kafka

第一步,引入依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.1</version><relativePath/> <!-- lookup parent in repository --></parent><artifactId>spring_boot_kafka_demo</artifactId><packaging>jar</packaging><name>spring_boot_kafka_demo Maven Webapp</name><url>http://maven.apache.org</url><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

第二步,編寫配置文件

編寫resources下的application.yml

spring:kafka:bootstrap-servers: localhost:9092consumer:auto-offset-reset: earliest

第三步,編寫生產(chǎn)者

@Service
public class Producer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");}
}

第四步,編寫消費者

@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}
}

第五步,編寫啟動類

@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}

運行效果

在這里插入圖片描述

2.4 記錄日志到kafka中

第一步,在2.3的基礎上,添加依賴

<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.12</version> <!-- Spring Boot 3.x 推薦版本 -->
</dependency>

第二步,添加kafka的日志appender類

public class KafkaLogbackAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {private String topic = "application-logs";private String bootstrapServers = "localhost:9092";private KafkaProducer<String, String> producer;@Overridepublic void start() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());this.producer = new KafkaProducer<>(props);super.start();}@Overrideprotected void append(ILoggingEvent eventObject) {String msg = eventObject.getFormattedMessage();producer.send(new ProducerRecord<>(topic, msg));}@Overridepublic void stop() {if (producer != null) {producer.close();}super.stop();}// Getter and Setter for XML configpublic void setTopic(String topic) {this.topic = topic;}public void setBootstrapServers(String bootstrapServers) {this.bootstrapServers = bootstrapServers;}
}

第三步,在resources下添加logback-spring.xml文件

<configuration debug="false" scan="true" scanPeriod="30 seconds"><!-- 定義日志格式 --><property name="PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"/><!-- 控制臺輸出 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>${PATTERN}</pattern></encoder></appender><!-- Kafka Appender --><appender name="KAFKA" class="com.demo.KafkaLogbackAppender"><bootstrapServers>localhost:9092</bootstrapServers><topic>application-logs</topic></appender><!-- 根日志輸出 --><root level="info"><appender-ref ref="STDOUT"/><appender-ref ref="KAFKA"/></root></configuration>

第四步,修改Producer類

@Service
public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");}
}

第五步,修改Consumer類

@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}@KafkaListener(id = "myId2", topics = "application-logs")public void listen2(String in) {System.out.println("resinfo:"+in);}
}
http://aloenet.com.cn/news/46679.html

相關文章:

  • 溧陽網(wǎng)站設計唐山seo排名
  • 做安全題目是哪個網(wǎng)站長沙百度網(wǎng)站快速排名
  • 蘇州網(wǎng)站建設設計公司免費網(wǎng)站建設制作
  • 做內(nèi)貿(mào)只要有什么網(wǎng)絡推廣網(wǎng)站搜索引擎技術基礎
  • 網(wǎng)站功能測試方法怎么做好公司官網(wǎng)推廣
  • 手機怎么做網(wǎng)站免費的百度學術官網(wǎng)入口
  • 電腦維修 做網(wǎng)站小廣告圖片
  • 網(wǎng)站建設系統(tǒng) 招標西安百度seo推廣電話
  • 濟寧做網(wǎng)站有哪幾家職業(yè)培訓機構需要什么資質
  • 招商平臺公司山西seo排名
  • 做棋牌推廣網(wǎng)站違法不網(wǎng)絡推廣站
  • 天津網(wǎng)站建設定制公司免費seo網(wǎng)站推廣在線觀看
  • 什么視頻網(wǎng)站可以做鏈接深圳網(wǎng)站開發(fā)公司
  • 商河做網(wǎng)站多少錢my77728域名查詢
  • 做網(wǎng)站要學多久電商運營
  • 為男人做購物網(wǎng)站攜程: 2023年旅行搜索上漲超900%
  • 網(wǎng)站建設招標公告首頁排名關鍵詞優(yōu)化
  • 鄭州網(wǎng)站建設方案書網(wǎng)絡的推廣
  • 微網(wǎng)站制作公司佛山疫情最新消息
  • 濰坊做企業(yè)手機版網(wǎng)站濰坊在線制作網(wǎng)站
  • 網(wǎng)站設計美工多少網(wǎng)絡推廣電話
  • 中化建工北京建設投資有限公司網(wǎng)站最新的疫情信息
  • 企業(yè)網(wǎng)站類型市場營銷互聯(lián)網(wǎng)營銷
  • 展示網(wǎng)站報價搜索引擎環(huán)境優(yōu)化
  • 什么網(wǎng)站可以做引文分析開發(fā)一個網(wǎng)站需要哪些技術
  • 酒泉網(wǎng)站建設平臺百度網(wǎng)盤pc網(wǎng)頁版入口
  • 哈爾濱網(wǎng)站建設與管理今日國際新聞頭條15條
  • 濱海住房和城鄉(xiāng)建設局網(wǎng)站谷歌查詢關鍵詞的工具叫什么
  • 做網(wǎng)站的圖片需要多少錢下載百度極速版免費安裝
  • 云南省建設廳招標辦網(wǎng)站網(wǎng)絡營銷活動策劃