qq電腦版官方網站策劃方案
1. Maxwell框架
開發(fā)公司為Zendesk公司開源,用java編寫的MySQL變更數(shù)據抓取軟件。內部是通過監(jiān)控MySQL的Binlog日志,并將變更數(shù)據以JSON格式發(fā)送到Kafka等流處理平臺。
1.1 MySQL主從復制
主機每次變更數(shù)據都會生成對應的Binlog日志,從機可以通過IO流的方式將Binlog日志下載到本地,可以通過它創(chuàng)造和主機一樣的環(huán)境或者作為熱備。
1.2 安裝Maxwell
- 解壓改名
- 啟動MySQL Binlog, vim /etc/my.cnf. 增加如下配置:
- binlog_format 日志類型的三種類型:
- 基于語句:主機執(zhí)行了什么語句,在從機里同樣執(zhí)行一遍。如果使用了random語句,會導致主從不一致。但是量級比較低
- 基于行:主機被改動后,從機同步一份。不會有主從不一致的問題,但是量價比較大,需要將每行修改的數(shù)據都拿一份。
- 混合模式:一般基于語句,但是如果基于語句會導致前后結果產生差異,自動轉成基于行。
- binlog_format 日志類型的三種類型:
#數(shù)據庫id
server-id = 1
#啟動binlog,該參數(shù)的值會作為binlog的文件名
log-bin=mysql-bin
#binlog類型,maxwell要求為row類型
binlog_format=row
#啟用binlog的數(shù)據庫,需根據實際情況作出修改
binlog-do-db=gmall
- 重啟MySQL服務
- 創(chuàng)建Maxwell所需所需的數(shù)據庫和用戶,用來存儲斷點續(xù)傳所需的數(shù)據。
CREATE DATABASE maxwell;
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';//maxwell庫的所有權限給maxwell
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';//其他庫的查詢、復制權限給maxwell
- 修改maxwell配置文件
cp 配置文件,將會復制某個文件并且可以改名。
producer=kafka
# 目標Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#目標Kafka topic,可靜態(tài)配置,例如:maxwell,也可動態(tài)配置,例如:%{database}_%{table}
kafka_topic=topic_db
# MySQL相關配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true# 過濾gmall中的z_log表數(shù)據,該表是日志數(shù)據的備份,無須采集
filter=exclude:gmall.z_log
# 指定數(shù)據按照主鍵分組進入Kafka不同分區(qū),避免數(shù)據傾斜
producer_partition_by=primary_key
1.3 Maxwell的使用
- 啟動zookeeper,kafka
- 啟動maxwell,
bin/maxwell --config config.properties --daemon
- 啟動kafka消費者進程,用于消費maxwell添加到kafka的變更數(shù)據
- 啟動數(shù)據生成jar包,查看消費者進程是否有新數(shù)據。
- 編寫Maxwell啟停腳本
#!/bin/bashMAXWELL_HOME=/opt/module/maxwellstatus_maxwell(){result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`return $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho "啟動Maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho "Maxwell正在運行"fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho "停止Maxwell"ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9elseecho "Maxwell未在運行"fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac
1.4 Bootstrap全量同步
Maxwell獲取的數(shù)據都是后期變更的數(shù)據,但沒有獲取到數(shù)據庫在開啟Binlog日志之前的原始數(shù)據。
全量同步命令:/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties
2. 數(shù)倉數(shù)據同步策略
2.1 用戶行為數(shù)據
數(shù)據源:Kafka
目的地:HDFS
傳輸方式采用Flume, 其中source為Kafka source, channel為Memmory channel, sink為HDFS sink。
根據官網查找相應參數(shù):
- Kafka Source
- type = Kafka Source全類名
- kafka.bootstrap.servers 連接地址
- kafka.topics = topic_log
- batchSize: 批次大小
- batchDurationMillis: 批次間隔2s
- File Channel
- type: file
- dataDirs: 存儲路徑
- checkpointDir: 偏移量存儲地址
- keep-alive: 管道滿了后,生產者間隔多少秒再放數(shù)據
- HDFS Sink
- hdfs.rollInterval : 文件滾動,解決小文件問題,每隔多久滾動一次
- rollSize: 文件大小
- hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d, 文件存放路徑
- hdfs.round = false, 不采用系統(tǒng)本地時間
#定義組件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false # 是否獲取本地時間a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制輸出文件類型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#組裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.2 零點漂移問題
在HDFS系統(tǒng)存放文件時是按照時間進行分區(qū)存放的,存放時查看的是header中的timestamp,但是由于數(shù)據傳輸過程中也需要一段時間,header中的時間并不是數(shù)據的實際產生時間,這個就是零點漂移問題。
解決辦法:借助攔截器,修改header中的timestamp的值。編寫攔截器代碼,需要在IDEA中創(chuàng)建對應的項目并打包。
- 導入依賴,flume-ng-core和JSON解析依賴fastjson (1.2.62)
- 創(chuàng)建包gmall.interceptor
- 創(chuàng)建類TimeStampInterceptor, 繼承Interceptor接口
- 實現(xiàn)intercept(Event event)和intercept(Event events)
- 使用fastjson來解析json文件,得到jsonObject對象,用來獲取時間戳ts。將獲取到的時間戳覆蓋header中的timestamp, 如果數(shù)據格式錯誤會拋異常,使用try-catch來捕獲它,并過濾掉該條數(shù)據。注意此處不能使用for循環(huán)來一邊遍歷,一邊刪除集合數(shù)據。
@Overridepublic Event intercept(Event event) {//1、獲取header和body的數(shù)據Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);try {//2、將body的數(shù)據類型轉成jsonObject類型(方便獲取數(shù)據)JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp時間字段替換成日志生成的時間戳(解決數(shù)據漂移問題)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;} catch (Exception e) {e.printStackTrace();return null;}
@Override
public List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();//必須使用迭代器刪除}}return list;
}
-
打包時注意要帶上fastjson依賴,需要在maven中添加配置打包插件。依賴中有flume和fastjson,但在虛擬機上有flume,沒有fastjson,所以需要排除flume??梢允褂胮rovided標簽來排除讓打包時排除依賴。
- compile:在單元測試、編譯、運行三種方式都會使用compile表明的依賴;
- test:在單元測試才會使用test表明的依賴;
- provided:在編譯才會使用test表明的依賴;
-
Flume配置文件中添加攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder # 全類名建議在IDEA中復制,Builder也需要根據自己的代碼函數(shù)名修改
- 重新生成數(shù)據,查看是否根據數(shù)據本身的時間戳存放到對應的HDFS分區(qū)文件中。
3. 業(yè)務數(shù)據同步
3.1 同步策略
- 全量同步:每天將所有數(shù)據同步一份,業(yè)務數(shù)據量小,優(yōu)先考慮全量同步。
- 增量同步:每天只將新增和變化進行同步,業(yè)務數(shù)據量大,優(yōu)先考慮增量同步。
3.2 數(shù)據同步工具
全量:DataX
、Sqoop
增量:Maxwell
、Canal
3.3 DataX
是一個數(shù)據同步工具,致力于實現(xiàn)包括關系型數(shù)據庫HDFS、Hive、ODPS、HBase、MySQL等等數(shù)據源之間的互傳。
- 架構= reader + framework + writer
- 運行流程
- job: 單個數(shù)據同步的作業(yè),會啟動一個進程。
- Task: 根據不同數(shù)據源的切分策略,一個Job會切分為多個Task,Task是DataX作業(yè)的最小單元,每個Task負責一部分,由一個線程執(zhí)行。
- 調度策略:會根據系統(tǒng)資源設置并發(fā)度,并發(fā)度為線程同時執(zhí)行的個數(shù),任務會按照并發(fā)度一組一組執(zhí)行。
3.4 DataX安裝
- 下載解壓DataX安裝包
bin/datax.py job/job.json
測試安裝包是否完整- MySQL Reader配置文件的書寫
- HDFS Writer配置文件的書寫
- 執(zhí)行datax命令
python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2022-06-08" /opt/module/datax/job/import/gmall.activity_info.json
- 執(zhí)行完后可以使用hadoop fs cat 路徑名 | zcat,來查看壓縮文件是否正確