app開(kāi)發(fā)和網(wǎng)站建設(shè)區(qū)別怎么注冊(cè)一個(gè)自己的網(wǎng)站
- Kafka簡(jiǎn)介
- 集群部署
- 配置Kafka
- 測(cè)試Kafka
1.Kafka簡(jiǎn)介
數(shù)據(jù)緩沖隊(duì)列。同時(shí)提高了可擴(kuò)展性。具有峰值處理能力,使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰。
Kafka是一個(gè)分布式、支持分區(qū)的(partition)、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng),它的最大的特性就是可以實(shí)時(shí)的處理大量數(shù)據(jù)以滿(mǎn)足各種需求場(chǎng)景:比如基于hadoop的批處理系統(tǒng)、低延遲的實(shí)時(shí)系統(tǒng)、web/nginx日志、訪問(wèn)日志,消息服務(wù)等等,用scala語(yǔ)言編寫(xiě),Linkedin于2010年貢獻(xiàn)給了Apache基金會(huì)并成為頂級(jí)開(kāi)源項(xiàng)目。
特性:
高吞吐量:kafka每秒可以處理幾十萬(wàn)條消息。
可擴(kuò)展性:kafka集群支持熱擴(kuò)展- 持久性、
可靠性:消息被持久化到本地磁盤(pán),并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失敗)
高并發(fā):支持?jǐn)?shù)千個(gè)客戶(hù)端同時(shí)讀寫(xiě)
它主要包括以下組件:
話題(Topic):是特定類(lèi)型的消息流。(每條發(fā)布到 kafka 集群的消息屬于的類(lèi)別,即 kafka 是面向 topic 的。)
生產(chǎn)者(Producer):是能夠發(fā)布消息到話題的任何對(duì)象(發(fā)布消息到 kafka 集群的終端或服務(wù)).
消費(fèi)者(Consumer):可以訂閱一個(gè)或多個(gè)話題,從而消費(fèi)這些已發(fā)布的消息。
服務(wù)代理(Broker):已發(fā)布的消息保存在一組服務(wù)器中,它們被稱(chēng)為代理(Broker)或Kafka集群。partition(區(qū)):每個(gè) topic 包含一個(gè)或多個(gè) partition。
replication:partition 的副本,保障 partition 的高可用。
leader:replica 中的一個(gè)角色, producer 和 consumer 只跟 leader 交互。
follower:replica 中的一個(gè)角色,從 leader 中復(fù)制數(shù)據(jù)。
zookeeper:kafka 通過(guò) zookeeper 來(lái)存儲(chǔ)集群的信息。
Zookeeper:
ZooKeeper是一個(gè)分布式協(xié)調(diào)服務(wù),它的主要作用是為分布式系統(tǒng)提供一致性服務(wù),提供的功能包括:配置維護(hù)、分布式同步等。Kafka的運(yùn)行依賴(lài)ZooKeeper。 ?也是java微服務(wù)里面使用的一個(gè)注冊(cè)中心服務(wù)
ZooKeeper主要用來(lái)協(xié)調(diào)Kafka的各個(gè)broker,不僅可以實(shí)現(xiàn)broker的負(fù)載均衡,而且當(dāng)增加了broker或者某個(gè)broker故障了,ZooKeeper將會(huì)通知生產(chǎn)者和消費(fèi)者,這樣可以保證整個(gè)系統(tǒng)正常運(yùn)轉(zhuǎn)。
在Kafka中,一個(gè)topic會(huì)被分成多個(gè)區(qū)并被分到多個(gè)broker上,分區(qū)的信息以及broker的分布情況與消費(fèi)者當(dāng)前消費(fèi)的狀態(tài)信息都會(huì)保存在ZooKeeper中。
2.集群部署
? ? ? ? 2.1環(huán)境
系統(tǒng):Centos-Stream7
節(jié)點(diǎn):
192.168.26.166? ?es01 ?
192.168.26.170? ?es02 ?
192.168.26.171???es03
軟件版本:kafka_2.12-3.0.2.tgz
? ? ? ? 2.2??安裝配置jdk8
#yum install -y java-1.8.0-openjdk
? ? ? ? 2.3??安裝配置zookeeper
在配置中要注意每個(gè)配置項(xiàng)后面不要有空格否則會(huì)導(dǎo)致zookeeper啟動(dòng)不起來(lái)!!!!
Kafka運(yùn)行依賴(lài)ZK,Kafka官網(wǎng)提供的tar包中,已經(jīng)包含了ZK,這里不再額外下載ZK程序。
配置相互解析---三臺(tái)機(jī)器(在es集群上安裝的kafka):
# vim /etc/hosts
192.168.26.166? ?es01 ?
192.168.26.170? ?es02 ?
192.168.26.171???es03
安裝Kafka:
# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.2/kafka_2.12-3.0.2.tgz
# tar xzvf kafka_2.12-2.8.0.tgz -C /usr/local/
# mv /usr/local/kafka_2.12-2.8.0/ /usr/local/kafka/
配置zookeeper:
在es01節(jié)點(diǎn)中:
# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
# vim /usr/local/kafka/config/zookeeper.properties ?#添加如下配置
dataDir=/opt/data/zookeeper/data ?# 需要?jiǎng)?chuàng)建,所有節(jié)點(diǎn)一致
dataLogDir=/opt/data/zookeeper/logs # 需要?jiǎng)?chuàng)建,所有節(jié)點(diǎn)一致
clientPort=2181?
tickTime=2000?
initLimit=20?
syncLimit=10?# 以下 IP 信息根據(jù)自己服務(wù)器的 IP 進(jìn)行修改
server.1=192.168.19.20:2888:3888 ?//kafka集群IP:Port
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888
#創(chuàng)建data、log目錄# mkdir -p /opt/data/zookeeper/{data,logs}
#創(chuàng)建myid文件
# echo 1 > /opt/data/zookeeper/data/myid ? ? #myid號(hào)按順序排
在es02節(jié)點(diǎn)中:
# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data?
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181?
tickTime=2000?
initLimit=20?
syncLimit=10?
server.1=192.168.19.20:2888:3888
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888
#創(chuàng)建data、log目錄# mkdir -p /opt/data/zookeeper/{data,logs}
#創(chuàng)建myid文件
# echo 2 > /opt/data/zookeeper/data/myid
在es03節(jié)點(diǎn)中:
# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data?
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181?
tickTime=2000?
initLimit=20?
syncLimit=10?
server.1=192.168.19.20:2888:3888
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888
#創(chuàng)建data、log目錄# mkdir -p /opt/data/zookeeper/{data,logs}
#創(chuàng)建myid文件
# echo 3 > /opt/data/zookeeper/data/myid
配置項(xiàng)含義:
dataDir ?? ?ZK數(shù)據(jù)存放目錄。
dataLogDir ?ZK日志存放目錄。
clientPort ?客戶(hù)端連接ZK服務(wù)的端口。
tickTime ? ?ZK服務(wù)器之間或客戶(hù)端與服務(wù)器之間維持心跳的時(shí)間間隔。
initLimit ? 允許follower連接并同步到Leader的初始化連接時(shí)間,當(dāng)初始化連接時(shí)間超過(guò)該值,則表示連接失敗。
syncLimit ? Leader與Follower之間發(fā)送消息時(shí)如果follower在設(shè)置時(shí)間內(nèi)不能與leader通信,那么此follower將會(huì)被丟棄。
server.1=192.168.19.20:2888:3888 ? ?2888是follower與leader交換信息的端口,3888是當(dāng)leader掛了時(shí)用來(lái)執(zhí)行選舉時(shí)服務(wù)器相互通信的端口。
3.配置Kafka
? ? ? ? 3.1??配置
# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties
# vim /usr/local/kafka/config/server.properties ?#在最后添加
broker.id=1 ?#改?
listeners=PLAINTEXT://192.168.19.20:9092 ? #改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs ?
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.19.20:2181,192.168.19.21:2181,192.168.19.22:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@es01 ~]# mkdir -p /opt/data/kafka/logs
? ? ? ? 3.2??其他節(jié)點(diǎn)配置
只需把配置好的安裝包直接分發(fā)到其他節(jié)點(diǎn),修改 Kafka的broker.id和 listeners就可以了。
? ? ? ? 3.3??配置項(xiàng)含義
broker.id?
?? ?每一個(gè)broker在集群中的唯一標(biāo)識(shí),要求是正數(shù)。在改變IP地址,不改變broker.id的時(shí)不會(huì)影響consumers
listeners=PLAINTEXT://192.168.19.22:9092 ? ? ??
?? ?監(jiān)聽(tīng)地址
num.network.threads ?
?? ?broker 處理消息的最大線程數(shù),一般情況下不需要去修改
num.io.threads
?? ?broker處理磁盤(pán)IO 的線程數(shù) ,數(shù)值應(yīng)該大于你的硬盤(pán)數(shù)
socket.send.buffer.bytes ??? ??? ?
?? ?socket的發(fā)送緩沖區(qū)
socket.receive.buffer.bytes?? ??? ?
?? ?socket的接收緩沖區(qū)
socket.request.max.bytes
?? ?socket請(qǐng)求的最大數(shù)值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,會(huì)被topic創(chuàng)建時(shí)的指定參數(shù)覆蓋
log.dirs ? ? ? ?日志文件目錄
num.partitions
num.recovery.threads.per.data.dir ? 每個(gè)數(shù)據(jù)目錄(數(shù)據(jù)目錄即指的是上述log.dirs配置的目錄路徑)用于日志恢復(fù)啟動(dòng)和關(guān)閉時(shí)的線程數(shù)量。
offsets.topic.replication.factortransaction state log replication factor ?事務(wù)主題的復(fù)制因子(設(shè)置更高以確??捎眯?#xff09;。 內(nèi)部主題創(chuàng)建將失敗,直到群集大小滿(mǎn)足此復(fù)制因素要求
log.cleanup.policy = delete
?? ?日志清理策略 選擇有:delete和compact 主要針對(duì)過(guò)期數(shù)據(jù)的處理,或是日志文件達(dá)到限制的額度,會(huì)被 topic創(chuàng)建時(shí)的指定參數(shù)覆蓋
log.cleanup.interval.mins=1
?? ?指定日志每隔多久檢查看是否可以被刪除,默認(rèn)1分鐘?? ?
log.retention.hours
?? ?數(shù)據(jù)存儲(chǔ)的最大時(shí)間 超過(guò)這個(gè)時(shí)間 會(huì)根據(jù)log.cleanup.policy設(shè)置的策略處理數(shù)據(jù),也就是消費(fèi)端能夠多久去消費(fèi)數(shù)據(jù)。log.retention.bytes和log.retention.minutes或者log.retention.hours任意一個(gè)達(dá)到要求,都會(huì)執(zhí)行刪除,會(huì)被topic創(chuàng)建時(shí)的指定參數(shù)覆蓋log.segment.bytes
?? ?topic的分區(qū)是以一堆segment文件存儲(chǔ)的,這個(gè)控制每個(gè)segment的大小,會(huì)被topic創(chuàng)建時(shí)的指定參數(shù)覆蓋
log.retention.check.interval.ms?
?? ?文件大小檢查的周期時(shí)間,是否觸發(fā) log.cleanup.policy中設(shè)置的策略
zookeeper.connect ??
?? ?ZK主機(jī)地址,如果zookeeper是集群則以逗號(hào)隔開(kāi)。
zookeeper.connection.timeout.ms ? ??
?? ?連接到Zookeeper的超時(shí)時(shí)間。
4.測(cè)試Kafka
? ? ? ? 4.1? 啟動(dòng)zookeeper集群
在三個(gè)節(jié)點(diǎn)依次執(zhí)行:
# cd /usr/local/kafka
# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
查看端口:
# netstat -lntp | grep 2181
? ? ? ? 4.2? 啟動(dòng)Kafka
在三個(gè)節(jié)點(diǎn)依次執(zhí)行:
# cd /usr/local/kafka
# nohup bin/kafka-server-start.sh config/server.properties &
? ? ? ? 4.3? 測(cè)驗(yàn)
驗(yàn)證??在192.168.26.166上創(chuàng)建topic:
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic
參數(shù)解釋:
–zookeeper指定zookeeper的地址和端口,
–partitions指定partition的數(shù)量,
–replication-factor指定數(shù)據(jù)副本的數(shù)量在26.170上面查詢(xún)192.168.26.166上的topic:
[root@es03 kafka]# bin/kafka-topics.sh --zookeeper 192.168.26.166:2181 --list
testtopic
? ? ? ? (二)模擬消息生產(chǎn)和消費(fèi)
發(fā)送消息到192.168.26.166:
[root@es01 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.19.20:9092 --topic testtopic
>hello
>你好呀
>
從192.168.26.171接受消息:
[root@es02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server ?192.168.19.21:9092 --topic testtopic --from-beginning