專業(yè)企業(yè)網(wǎng)站搭建服務有創(chuàng)意的網(wǎng)絡廣告案例
1.zookeeper
kafka3.0之前依賴于zookeeper
zookeeper是一個開源,分布式的架構,提供協(xié)調(diào)服務(Apache項目)
基于觀察者模式涉及的分布式服務管理架構
存儲和管理數(shù)據(jù),分布式節(jié)點上的服務接受觀察者的注冊,一旦分布式節(jié)點上的數(shù)據(jù)如果發(fā)生變化,由zookeeper來負責通知分布式節(jié)點上的服務
zookeeper分為領導者,追隨者 leader follower組成的集群
只要有一半以上的集群存活,zookeeper集群就可以正常工作,適用于安裝奇數(shù)臺的服務集群
全局數(shù)據(jù)一致,每個zookeeper每個節(jié)點都保存相同的數(shù)據(jù),維護監(jiān)控服務的數(shù)據(jù)一致。(主要作用)
數(shù)據(jù)更新的原子性,要么都成功,要么都失敗
實時性,只要有變化,立刻同步。
zookeeper的應用場景
1.統(tǒng)一命名服務,在分布式的環(huán)境下,對所有的應用和服務進行統(tǒng)一命名
2.統(tǒng)一配置管理,配置文件同步,kafka的配置文件被修改,可以快速同步到其他節(jié)點
3.統(tǒng)一集群管理,實時掌握所有節(jié)點的狀態(tài)
4.服務器動態(tài)上下限
5.負載均衡,把訪問的服務器的數(shù)據(jù),發(fā)送到訪問最少的服務器處理客戶端的請求
領導者和追隨者:zookeeper的選舉機制
三臺服務器:A B C
A 先啟動 發(fā)起第一選舉,投票投給自己,只有一票,不滿半數(shù),A的狀態(tài)是looking
B 啟動 在發(fā)起一次選舉,A和B分別投自己一票,交換選票信息,myid A發(fā)現(xiàn)B的myid比A的大,A的這票會轉(zhuǎn)而投給B,A 0 B 2 沒有半數(shù)以上結果,A B 會進入looking 、B有可能leader
C 啟動 MYID C的myid最大 A和B都會把票投給C A 0 B 0 C 3
C的狀態(tài)變?yōu)閘eader A B變?yōu)閒ollower
只有l(wèi)eader確定,后續(xù)的服務器都是追隨者
只有兩種情況會開啟選舉機制:
1.初始化的情況會產(chǎn)生選舉
2.服務器之間和eader丟失了連接狀態(tài)
特殊情況下:
leader已經(jīng)存在,建立連接即可
leader不存在,leader不存在
1.服務器ID大的勝出
2.EPOCH大,直接勝出
3.EPOCH相同,事務ID大的勝出
EPOCH每個leader任期的代號,沒有l(wèi)eader,大家的邏輯地址相同。每投完一次之后,數(shù)據(jù)時遞增
事務id表示服務器的每一次更新,每變更一次id變化一次‘
服務器ID
zookeeper 當中所有機器。每臺機器不重復,和mysql保存一直
zookeeper+kafka(2.7.0)
kafka (3.4.1)
實現(xiàn)
zookeeper集群
192.168.233.10 zookeeper+kafka
192.168.233.20 zookeeper+kafka
192.168.233.30 zookeeper+kafka
所有
systemctl stop firewalld
setenforce 0
cd /opt
拖進去 apache-zookeeper-3.5.7-bin.tar.gz kafka_2.13-2.7.0
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version
cd /opt
tar -xf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin zookeeper
cd zookeeper
cd /opt/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
tickTime=2000
#服務器與客戶端之間心跳時間,2秒檢測一次服務器和客戶端之間的通信
initLimit=10
#領導者和追隨者之間,最對幾次心跳數(shù)超時 10*2S 20S
syncLimit=5
#同步超時時間,領導者和追隨者之間,同步通信超時的時間。5*2s leader會認為follower丟失,移除集群
16行
dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/logs (需要改)
添加
server.1=192.168.233.10:3188:3288
server.2=192.168.233.20:3188:3288
server.3=192.168.233.30:3188:3288
##
server.1=192.168.233.10:3188:3288
1 定義每個zookeeper集群的初始myid
192.168.233.10 :服務器的ip地址
3188:領導者和追隨者之間交換信息的端口(內(nèi)部通信的端口)
3288:一旦leader丟失響應,開啟選舉,3288就是用來執(zhí)行選舉時的通信端口
##
wq!
mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs
三臺分別創(chuàng)建
10:
cd ..
cd data/
echo 1 > myid
20:
cd ..
cd data/
echo 2 > myid
30:
cd ..
cd data/
echo 3 > myid
所有
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)
echo "---------- zookeeper 啟動 ------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper 停止 ------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper 重啟 ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper 狀態(tài) ------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
wq!
三臺服務器分別啟動(一個一個起)
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper
service zookeeper start
起完以后看狀態(tài)
service zookeeper status
2.kafka
為什么要引入消息隊列(MQ),首先他也是一個中間件。在高并發(fā)環(huán)境下,同步請求太多來不及處理,來不及處理的請求會阻塞,比方數(shù)據(jù)庫就會形成行鎖或者表鎖,請求線程滿了,超標了,too many connection。整個系統(tǒng)雪崩
消息隊列的作用:異步處理請求 流量削峰,應用解耦 可恢復性 緩沖機制
解耦:
耦合:在軟件系統(tǒng)當中,修改一個組件需要修改所有其他組件,高度耦合
低度耦合:修改其中一個組件,對其他最賤影響不大,無需修改所有
A B C
只有通信保證,其他的修改不影響整個集群,每個組件可以獨立的擴展,修改,降低組件之間的依賴性
依賴點就是接口約束,通過不同的端口,保證集群通信
可恢復性:系統(tǒng)當中的有一部分組件消失,不影響整個系統(tǒng),也就是在消息隊列當中,即使有一個處理消息的進程失敗,一旦恢復還可以重新加入到隊列當中,繼續(xù)處理消息。
緩沖機制:可以控制和優(yōu)化數(shù)據(jù)經(jīng)過系統(tǒng)的時間和速度,解決生產(chǎn)消息和消費消息處理速度不一致的問題
峰值的處理能力:消息隊列在峰值情況之下,能夠頂住突發(fā)的訪問壓力。避免專門為了突然情況而對系統(tǒng)進行修改
異步通信:允許用戶把一個消息放入隊列,但是不立即處理,等我想處理的時候在處理
消息隊列的模式:
點對點 一對一:消息的生產(chǎn)者發(fā)送消息到隊列中,消費者從隊列中提取消息,消費者提取完之后,隊列中被提取的消息將會被移除,后續(xù)消費者不能再消費隊列當中的消息,消息隊列可以有多個消費者,但是一個消息,只能由一個消費者提取
RABBITMQ
發(fā)布/訂閱模式:一對多,又叫做觀察者模式。消費者提取數(shù)據(jù)之后,隊列當中的消息不會被清除
生產(chǎn)者發(fā)布一個消息到對象(主題)所有消費者都是通過主題獲取消費之后,隊列當中的消息不會消除
主題:topic topic類似于一個數(shù)據(jù)流的管道,生產(chǎn)者把消息發(fā)布到主題,消息從主題當中訂閱數(shù)據(jù),主題可以分區(qū),每個分區(qū)都要自己的偏移量
分區(qū):partition 每個主題都可以分成多個分區(qū),每個分區(qū)是數(shù)據(jù)的有序子集,分區(qū)可以運行kafka進行水平擴展,以處理大量數(shù)據(jù)
消息在分鐘按照偏移量存儲,消費者可以獨立每個分區(qū)的數(shù)據(jù)
偏移量:是每個消息在分區(qū)中唯一的標識,消費者可以通過偏移量來跟蹤獲取已讀或者未讀消息的位置,也可以提交偏移量來記錄已處理的信息
生產(chǎn)者:producer 生產(chǎn)者把數(shù)據(jù)發(fā)送kafka的主題當中 負責寫入消息
消費者:consumer從主題當中讀取數(shù)據(jù),消費者可以是一個也可以是多個,每個消費者有一個唯一的消費者組ID,kafka實現(xiàn)負載均衡和容錯性
經(jīng)紀人:Broker每個kafka節(jié)點都有一個Broker,每個Broker負責一臺kafka服務器,id唯一,存儲主題分區(qū)當中的數(shù)據(jù),處理生產(chǎn)和消費者的請求,維護元數(shù)據(jù)(zookeeper)
zookeeper:zookeeper負責保存元數(shù)據(jù),元數(shù)據(jù)就是topic的相關信息(發(fā)布在哪臺主機上,指定了多少分區(qū),以及副本數(shù),偏移量)
zookeeper自建一個主題:_consumer_offsets,
3.0之后不依賴zookeeper的核心 元數(shù)據(jù)由kafka節(jié)點自己管理
消費的方式:
begining,從頭開始
實時更新
指定位置,用代碼編寫
kafka的工作流程:
生產(chǎn)者向主題里面發(fā)送數(shù)據(jù),主題里的分區(qū)保存數(shù)據(jù),消費者根據(jù)消費方式來消費數(shù)據(jù)。
生產(chǎn)者寫入的topic的數(shù)據(jù)時持久化,默認7個小時
至少一次語義:只要消費者進入,確保消息至少被消費一次。
實驗
所有
cd /opt
tar -xf kafka_2.13-2.7.0.tgz
mv kafka_2.13-2.7.0 kafka
vim /etc/profile
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
cd /opt/kafka/config
cp server.properties server.properties.bak
10:
vim server.properties
21行:
65行 修改
130行 修改
zookeeper.connect=192.168.233.10:2181,192.168.233.20:2181,192.168.233.30:2181
配置zookeeper的集群
20:
所有:
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/opt/kafka'
case $1 in
start)
echo "---------- Kafka 啟動 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 狀態(tài) ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
wq!
chmod +x /etc/init.d/kafka
chkconfig --add kafka
service kafka start
netstat -antp | grep 9092
所有:
cd /opt/kafka/bin
20:
kafka-topics.sh --create --zookeeper 192.168.233.10:2181,192.168.233.20:2181,192.168.233.30:2181 --replication-factor 2 --partitions 3 --topic test1
kafka-topics.sh --create --zookeeper 192.168.66.15:2181,192.168.66.16:2181,192.168.66.17:2181 --replication-factor 2 --partitions 3 --topic test1
##
創(chuàng)建主題
1.在kafka的bin目錄下創(chuàng)建,是所有kafka可執(zhí)行命令的文件
2. --zookeeper指定的是zookeeper的地址和端口,保存kafka的元數(shù)據(jù)
3. --replication-factor 2 定義每個分區(qū)的副本數(shù)
4.partition 3 指定主題的分區(qū)數(shù)
5. --topic test1 指定主題的名稱
10:
kafka-topics.sh --describe --zookeeper 192.168.233.10:2181,192.168.233.20:2181,192.168.233.30:2181
kafka-topics.sh --describe --zookeeper 192.168.233.10:2181,192.168.233.20:2181,192.168.233.30:2181 --topic test1
kafka-topics.sh --describe --zookeeper 192.168.66.15:2181,192.168.66.16:2181,192.168.66.17:2181
kafka-topics.sh --describe --zookeeper 192.168.66.15:2181,192.168.66.16:2181,192.168.66.17:2181 --topic test3
所有
vim /etc/hosts
192.168.233.10 test1
192.168.233.20 test2
192.168.233.30 test3
記得改主機名
20:
kafka-console-producer.sh --broker-list 192.168.66.15:9092,192.168.66.16:9092,192.168.66.17:9092 --topic test3
kafka-console-producer.sh --broker-list 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092 --topic test1
30:
從頭開始
kafka-console-consumer.sh --bootstrap-server 192.168.66.15:9092,192.168.66.16:9092,192.168.66.17:9092 --topic test3 --from-beginning
kafka-console-consumer.sh --bootstrap-server 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092 --topic test1 --from-beginning
實時消息
kafka-console-consumer.sh --bootstrap-server 192.168.66.15:9092,192.168.66.16:9092,192.168.66.17:9092 --topic test3
kafka-console-consumer.sh --bootstrap-server 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092 --topic test1
20:創(chuàng)建
kafka-topics.sh --create --zookeeper 192.168.233.20:2181 --partitions 1 --replication-factor 1 --topic guoqi1
kafka-topics.sh --create --zookeeper 192.168.66.16:2181 --partitions 1 --replication-factor 1 --topic guoqi1
30:創(chuàng)建
kafka-topics.sh --create --zookeeper 192.168.233.20:2181 --partitions 1 --replication-factor 1 --topic guoqi2
kafka-topics.sh --create --zookeeper 192.168.66.17:2181 --partitions 1 --replication-factor 1 --topic guoqi2
10:連接
kafka-console-consumer.sh --bootstrap-server 192.168.233.20:9092 --topic guoqi1
kafka-console-consumer.sh --bootstrap-server 192.168.233.20:9092 --topic guoqi2
kafka-console-consumer.sh --bootstrap-server 192.168.66.16:9092 --topic guoqi1
kafka-console-consumer.sh --bootstrap-server 192.168.66.17:9092 --topic guoqi2
20:發(fā)送
kafka-console-producer.sh --broker-list 192.168.233.20:9092 --topic guoqi1
kafka-console-producer.sh --broker-list 192.168.66.16:9092 --topic guoqi1
30:發(fā)送
kafka-console-producer.sh --broker-list 192.168.233.30:9092 --topic guoqi2
kafka-console-producer.sh --broker-list 192.168.66.17:9092 --topic guoqi2
如何修改分區(qū)數(shù)
20:kafka-topics.sh --zookeeper 192.168.233.20:2181 --alter --topic guoqi1 --partitons 3
刪除
kafka-topics.sh --delete --zookeeper 192.168.233.20:2181 --topic guoqi1
查看元數(shù)據(jù)
cd /opt/zookeeper/bin
./zkCli.sh -server 192.168.233.10:2181