找人建個(gè)網(wǎng)站多少錢(qián)semantic ui
系列文章目錄
線上問(wèn)診:業(yè)務(wù)數(shù)據(jù)采集
文章目錄
- 系列文章目錄
- 前言
- 一、環(huán)境準(zhǔn)備
- 1.Hadoop
- 2.Zookeeper
- 3.Kafka
- 4.Flume
- 5.Mysql
- 6.Maxwell
- 二、業(yè)務(wù)數(shù)據(jù)采集
- 1.數(shù)據(jù)模擬
- 2.采集通道
- 總結(jié)
前言
暑假躺了兩個(gè)月,也沒(méi)咋寫(xiě)博客,準(zhǔn)備在開(kāi)學(xué)前再做個(gè)項(xiàng)目找找感覺(jué),由于之前做過(guò)廣告數(shù)倉(cāng)的案例,這次的博客會(huì)相對(duì)簡(jiǎn)略一些,數(shù)倉(cāng)包括離線和實(shí)時(shí)兩個(gè)部分,離線用來(lái)加深記憶,實(shí)時(shí)用來(lái)學(xué)習(xí)新技術(shù)。
一、環(huán)境準(zhǔn)備
由于很多內(nèi)容之前博客都完成過(guò)。這里就不過(guò)多贅述了。
1.Hadoop
Hadoop學(xué)習(xí)專欄前四章內(nèi)容為Hadoop集群安裝。
2.Zookeeper
Zookeeper安裝
3.Kafka
Kafka安裝
4.Flume
Flume安裝
安裝好之后為了后邊實(shí)驗(yàn)方便,修改兩個(gè)參數(shù)
Flume安裝后需要分發(fā)到所有節(jié)點(diǎn)
xsync /opt/module/flume/
5.Mysql
廣告數(shù)倉(cāng):采集通道創(chuàng)建
6.Maxwell
這里我們選用1.29.2版本,因?yàn)樵?.30.0開(kāi)始,放棄了對(duì)java8的支持。
1.上傳并解壓
2.創(chuàng)建Maxwell所需數(shù)據(jù)庫(kù)和用戶
CREATE DATABASE maxwell;
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
3.修改配置文件
cp config.properties.example config.properties
vim config.properties
producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka_topic=topic_db# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true# 指定數(shù)據(jù)按照主鍵分組進(jìn)入Kafka不同分區(qū),避免數(shù)據(jù)傾斜
producer_partition_by=primary_key
4.啟停腳本
vim ~/bin/mxw.sh
#!/bin/bashMAXWELL_HOME=/opt/module/maxwellstatus_maxwell(){result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`return $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho "啟動(dòng)Maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho "Maxwell正在運(yùn)行"fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho "停止Maxwell"ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9elseecho "Maxwell未在運(yùn)行"fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac
5.添加權(quán)限
chmod +x ~/bin/mxw.sh
二、業(yè)務(wù)數(shù)據(jù)采集
1.數(shù)據(jù)模擬
上傳模擬數(shù)據(jù)
mkdir /opt/module/mock-medical
cd /opt/module/mock-medical
修改配置文件
vim application.yaml
現(xiàn)在我們?yōu)槠鋭?chuàng)建數(shù)據(jù)庫(kù)。
mysql -uroot -p000000 -e"drop database if exists medical;create database medical charset utf8mb4 collate utf8mb4_general_ci;"
執(zhí)行jar包
java -jar mock-medical-1.1.jar
在數(shù)據(jù)庫(kù)檢查是否獲取到數(shù)據(jù)。
封裝成腳本
vim ~/bin/medical_mock.sh
#!/bin/bashfor ((i=0; i < $1; i++))
doecho "正在執(zhí)行第 $[ $i + 1 ] 次數(shù)據(jù)模擬"ssh hadoop102 "cd /opt/module/mock-medical/; java -jar mock-medical-1.1.jar"
done
2.采集通道
現(xiàn)在我們已經(jīng)將初始數(shù)據(jù)輸入到Mysql,接下來(lái)使用Maxwell將數(shù)據(jù)傳輸?shù)終afka。
先啟動(dòng)Zookeeper,Kafka和Maxwell
Maxwell會(huì)實(shí)時(shí)監(jiān)控MYSQL數(shù)據(jù),然后將其傳送到Kafka集群,所以我們現(xiàn)打開(kāi)一個(gè)Kafka消費(fèi)集群。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db
現(xiàn)在我們?cè)俅紊a(chǎn)數(shù)據(jù)輸入到MYSQL,Maxwell就會(huì)將產(chǎn)生的數(shù)據(jù)傳送到Kafka集群,我們實(shí)現(xiàn)打開(kāi)的消費(fèi)者就會(huì)收到數(shù)據(jù)。
出現(xiàn)數(shù)據(jù),說(shuō)明通道沒(méi)問(wèn)題。到此數(shù)據(jù)采集通道完成。
總結(jié)
這里抓一次快照,后邊實(shí)時(shí)數(shù)倉(cāng)還要基于這里再次開(kāi)發(fā)。