国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

和女的做那個視頻網(wǎng)站應(yīng)用商店優(yōu)化

和女的做那個視頻網(wǎng)站,應(yīng)用商店優(yōu)化,做網(wǎng)站的收費,網(wǎng)站設(shè)計實用實例前言 學數(shù)倉的時候發(fā)現(xiàn) flume 落了一點,趕緊補齊。 1、Flume 事務(wù) Source 在往 Channel 發(fā)送數(shù)據(jù)之前會開啟一個 Put 事務(wù): doPut:將批量數(shù)據(jù)寫入臨時緩沖區(qū) putList(當 source 中的數(shù)據(jù)達到 batchsize 或者 超過特定的時間就會…

前言

? ? ? ? 學數(shù)倉的時候發(fā)現(xiàn) flume 落了一點,趕緊補齊。

1、Flume 事務(wù)

Source 在往 Channel 發(fā)送數(shù)據(jù)之前會開啟一個 Put 事務(wù):

  1. doPut:將批量數(shù)據(jù)寫入臨時緩沖區(qū) putList(當 source 中的數(shù)據(jù)達到 batchsize 或者 超過特定的時間就會發(fā)送數(shù)據(jù))
  2. doCommit:檢查 channel 內(nèi)存隊列是否足夠合并
  3. doRollback:如果 channel 內(nèi)存隊列空間不足沒救回滾數(shù)據(jù)

同樣 Sink 在從 Channel 主動拉取數(shù)據(jù)的時候也會開啟一個 Take 事務(wù):

  1. doTake:將數(shù)據(jù)讀取到臨時緩沖區(qū) takeList,并將數(shù)據(jù)發(fā)送到 HDFS
  2. doCommit:如果數(shù)據(jù)全部發(fā)送成功,就會清除臨時緩沖區(qū) taskList
  3. dooRollback:數(shù)據(jù)發(fā)送過程如果出現(xiàn)異常,rollback 將臨時緩沖區(qū)的數(shù)據(jù)歸還給 channel 內(nèi)存隊列

2、Flume Agent 內(nèi)部原理

注意:只有 source 和 channel 之間可以存在攔截器,channel 和 sink 之間不可以!??

  1. source 接收數(shù)據(jù),把數(shù)據(jù)封裝成 Event?
  2. 傳給 channel processor 也就是 channel 處理器
  3. 把事件傳給攔截器(interceptor),在攔截器這里可以對數(shù)據(jù)進行一些處理(我們在上一節(jié)中說過,當我們的路徑信息中包含時間的時候,需要從 Event Header 中讀取時間信息,如果沒有就需要我們指定從本地讀取 timestamp,所以這里我們就可以在攔截器這里給我們的 event 添加頭部信息);而且,攔截器可以設(shè)置多個
  4. 經(jīng)過攔截器處理的事件又返回給了 channel processor ,然后 channel processor 把事件傳給 channel 選擇器(channel selector 有兩種類型:Replicating 和 Multiplexing ,Replicating 會把source 發(fā)送來的 events 發(fā)往所有 channel,而 multiplexing 可以配置指定發(fā)往哪些 channel)
  5. 經(jīng)過 channel 選擇器處理后的事件仍然返回給 channel processor
  6. channel processor 會根據(jù) channel 選擇器的結(jié)果,發(fā)送給相應(yīng)的 channel(也就是這個時候才會真正的開啟 put 事務(wù),之前都是對 event 進行簡單的處理)
  7. SinkProcessor 負責協(xié)調(diào)拉取 channel 中的數(shù)據(jù),它有三種類型:DefaultSinkProcessor、LoadBalancingSinkpProcessor(負載均衡,也就是多個 Sink 輪詢的方式去讀取 channel 中的數(shù)據(jù))、FailoverSinkProcessor(故障轉(zhuǎn)移,每個 sink 有自己的優(yōu)先級,優(yōu)先級高的去讀取 channel 中的事件,只有當它掛掉的時候,才會輪到下一個優(yōu)先級的 sink 去讀)。其中 DefaultSinkProcessor 一個 channel 只能綁定一個 Sink,所以它也就沒有 sink 組的概念。

注意:一個 sink 只可以綁定一個 channel ,但是一個 channel 可以綁定多個 sink!

3、Flume 拓撲結(jié)構(gòu)

3.1、簡單串聯(lián)

官網(wǎng)這段話翻譯過來就是:為了將數(shù)據(jù)跨越多個代理或躍點進行傳輸,前一個代理的接收器(sink)和當前躍點的源(source)需要是avro類型,接收器指向源的主機名(或IP地址)和端口。

這種模式的缺點很好理解,就像串聯(lián)電路,一個節(jié)點壞了會影響整個系統(tǒng)。

3.2、復(fù)制和多路復(fù)用

從官網(wǎng)翻譯過來就是:上述示例顯示了一個名為“foo”的代理源將流程分散到三個不同的通道。這種分散可以是復(fù)制或多路復(fù)用。在復(fù)制流程的情況下,每個事件都會發(fā)送到這三個通道。對于多路復(fù)用的情況,當事件的屬性與預(yù)配置的值匹配時,事件將被發(fā)送到可用通道的子集。例如,如果事件屬性名為“txnType”設(shè)置為“customer”,則應(yīng)發(fā)送到channel1和channel3,如果為“vendor”,則應(yīng)發(fā)送到channel2,否則發(fā)送到channel3。映射可以在代理的配置文件中設(shè)置。

這種模式相比上面的串聯(lián)模式的優(yōu)點無非就是可以發(fā)送過多個目的地。

3.3、負載均衡和故障轉(zhuǎn)移

Flume 支持多個 Sink 邏輯上分到一個 Sink 組,sink 組配合不同的 SinkProcessor ,可以實現(xiàn)負載均衡和錯誤恢復(fù)的功能。

3.4、聚合

這種模式在實際開發(fā)中是經(jīng)常會用到的,日常web應(yīng)用通常分布在上百個服務(wù)器,大者甚至上千個、上萬個服務(wù)器。產(chǎn)生的日志,處理起來也非常麻煩。用flume的這種組合方式能很好的解決這一問題,每臺服務(wù)器部署一個flume采集日志,傳送到一個集中收集日志的 flume,再由此flume上傳到hdfshive、hbase等,進行日志分析。

4、Flume 企業(yè)開發(fā)實例

4.1、復(fù)制和多路復(fù)用

注意:多路復(fù)用必須配合攔截器使用,因為需要在 Event Header 中添加一些信息

1)案例需求

使用 Flume-1 監(jiān)控文件變動,Flume-1 將變動內(nèi)容傳遞給 Flume-2,Flume-2 負責存儲到 HDFS。同時 Flume-1 將變動內(nèi)容傳遞給 Flume-3,Flume-3 負責輸出到 Local FileSystem。

2)需求分析

  • 監(jiān)控文件變動我們可以考慮使用 taildir 或者 exec 這兩種 source
  • flume-1 sink 需要使用 avro sink 才能傳輸?shù)较乱粋€ flume-2 和 flume-3 的 source
  • flume-2 需要上傳數(shù)據(jù)到 HDFS 所以?sink 為 hdfs
  • flume-3 需要把數(shù)據(jù)輸出到本地,所以 sink 為 file_roll sink(要保存到本地目錄,這個目錄就必須提前創(chuàng)建好,它不像 HDFS Sink 會自動幫我們創(chuàng)建)

我們需要實現(xiàn)三個 flume 作業(yè):

  1. flume-1 把監(jiān)聽到的新日志讀取到 flume-2 和 flume-3 的 source
  2. flume-2 把日志上傳到 hdfs
  3. flume-3 把日志寫到本地

3)需求實現(xiàn)

flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# 將數(shù)據(jù)流復(fù)制給所有 channel 默認就是 replicating 所以也可以不用配置
a1.sources.r1.selector.type = replicating 
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink 端的 avro 是一個數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
# 一個 sink 只可以指定一個 channel,但是一個 channel 可以指定多個 sink
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source 端的 avro 是一個數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#設(shè)置每個文件的滾動大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與 Event 數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

4)測試

bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-dir.conf
bin/flume-ng agent -n a1 -c conf/ -f job/group1/flume-file-flumc.conf
bin/flume-ng agent -n a2 -c conf/ -f job/group1/flume-hdfs.conf

查看結(jié)果:

注意:寫入本地文件時,當一段時間沒有新的日志時,它仍然會創(chuàng)建一個新的文件,而不像 hdfs sink 即使達到了設(shè)置的間隔時間但是沒有新日志產(chǎn)生,那么它也不會創(chuàng)建一個新的文件。

這個需要注意的就是 hdfs 的端口不要寫錯,比如我的就不是 9870 而是 8020.

4.2、負載均衡和故障轉(zhuǎn)移

1)案例需求

使用 Flume1 監(jiān)控一個端口,其 sink 組中的 sink 分別對接 Flume2 和 Flume3,采用 FailoverSinkProcessor,實現(xiàn)故障轉(zhuǎn)移的功能。

2)需求分析

  • 開啟一個端口 88888 來發(fā)送數(shù)據(jù)
  • 使用 flume-1 監(jiān)聽該端口,并發(fā)送到 flume-2 和 flume-3 (需要 flume-1 的 sink 為 avro sink,flume-2 和 flume-3 的 source 為 avro source),flume-2 和 flume-3 發(fā)送日志到控制臺(flume-2 和 flume-3 的 sink 為 logger sink)

3)需求實現(xiàn)

flume-nc-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = logger# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-console2.conf?
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

?4)案例測試

bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-nc-flume.conf

關(guān)閉 flume-flume-console1.conf 作業(yè)?

?我們發(fā)現(xiàn),一開始我們開啟三個 flume 作業(yè),當向 netcat 輸入數(shù)據(jù)時,只有 flume-flume-console1.conf 作業(yè)的控制臺有日志輸出,這是因為它的優(yōu)先級更高,當把作業(yè)?flume-flume-console1.conf 關(guān)閉時,再次向端口 44444 發(fā)送數(shù)據(jù),發(fā)現(xiàn)?flume-flume-console2.conf 作業(yè)開始輸出。

如果要使用負載均衡,只需要替換上面 flume-nc-flume.conf 中:

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

替換為:

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.maxTimeOut = 30000

其中,backoff 代表退避,默認為 false,?如果當前 sink 沒有拉到數(shù)據(jù),那么接下來一段時間就不用這個 sink 。maxTimeOut 代表最大的退避時間,因為退避默認是指數(shù)增長的(比如一個 sink 第一次沒有拉到數(shù)據(jù),需要等 1 s,第二次還沒拉到,等 2s,第三次等 4s ...),默認最大值為 30 s。

4.3、聚合

1)案例需求

  • hadoop102 上的 Flume-1 監(jiān)控文件/opt/module/group.log,
  • hadoop103 上的 Flume-2 監(jiān)控某一個端口的數(shù)據(jù)流,
  • Flume-1 與 Flume-2 將數(shù)據(jù)發(fā)?hadoop104 上的 Flume-3,Flume-3 將最終數(shù)據(jù)打印到控制臺。

注意:主機只能在 hadoop104 上配,因為 avro source 在 hadoop104 上,客戶端(hadoop02 和 hadoop103 的 sink)可以遠程連接,但是服務(wù)端(hadoop104 的 source)只能綁定自己的端口號。

2)需求實現(xiàn)

flume-log-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
?flume-nc-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-log.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

3)測試

向 group.log 文件中追加文本:

注意:hadoop103 這里不能寫 nc localhost 44444 而要寫 nc hadoop103 44444! 否則報錯:Ncat: Connection refused.

5、自定義 Interceptor

前面我們的多路復(fù)用還沒有實現(xiàn),因為我們說多路復(fù)用必須配合攔截器來使用,因為我們必須知道每個 Channel 發(fā)往哪些 Sink,這需要攔截器往 Event Header 中寫一些內(nèi)容。

1)案例需求

使用 Flume 采集服務(wù)器本地日志,需要按照日志類型的不同,將不同種類的日志發(fā)往不同的分析系統(tǒng)。

2)需求分析

在實際的開發(fā)中,一臺服務(wù)器產(chǎn)生的日志類型可能有很多種,不同類型的日志可能需要發(fā)送到不同的分析系統(tǒng)。此時會用到 Flume 拓撲結(jié)構(gòu)中的 Multiplexing 結(jié)構(gòu),Multiplexing 的原理是,根據(jù) event 中 Header 的某個 key 的值,將不同的 event 發(fā)送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同類型的 event 的 Header 中的 key 賦予不同的值。

在該案例中,我們以端口數(shù)據(jù)模擬日志,以是否包含”lyh”模擬不同類型的日志,我們需要自定義 interceptor 區(qū)分數(shù)據(jù)中是否包含”lyh”,將其分別發(fā)往不同的分析系統(tǒng)(Channel)。

?3)需求實現(xiàn)

自定義攔截器

引入 flume 依賴

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
package com.lyh.gmall.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TypeInterceptor implements Interceptor {// 存放事件集合private List<Event> addHeaderEvents;@Overridepublic void initialize() {// 初始化存放事件的集合addHeaderEvents = new ArrayList<>();}// 單個事件攔截@Overridepublic Event intercept(Event event) {// 1. 獲取事件中的 header 信息Map<String, String> headers = event.getHeaders();// 2. 獲取事件中的 body 信息String body = new String(event.getBody());// 3. 根據(jù) body 中是否包含 'lyh' 來決定發(fā)往哪個 sinkif (body.contains("lyh"))headers.put("type","first");elseheaders.put("type","second");return event;}// 批量事件攔截@Overridepublic List<Event> intercept(List<Event> events) {// 1. 清空集合addHeaderEvents.clear();// 2. 遍歷 eventsfor (Event event : events) {// 3. 給每個事件添加頭信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TypeInterceptor();}@Overridepublic void configure(Context context) {}}
}

打包放到 flume 安裝目錄的 lib 目錄下:
?

flume 作業(yè)配置

hadoop102:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.lyh.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1 # 包含 'lyh'
a1.sources.r1.selector.mapping.second = c2 # 不包含 'lyh'# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
hadoop103:
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
hadoop104:
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

4)需求實現(xiàn)

#hadoop103
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume2.conf -Dflume.root.logger=INFO,console#hadoop104
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume3.conf -Dflume.root.logger=INFO,console#hadoop102
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume1.conf
nc localhost 44444

hadoop102:

hadoop103:

hadoop104:?

可以看到,從 hadoop102 發(fā)送的日志中,包含 "lyh" 的都被發(fā)往 hadoop103 的 4141 端口,其它日志則被發(fā)往 hadoop104 的 4242端口。

6、自定義 Source

自定義 source 用的還是比較少的,畢竟 flume 已經(jīng)提供了很多常用的了。

1)介紹

????????Source 是負責接收數(shù)據(jù)到 Flume Agent 的組件。Source 組件可以處理各種類型、各種格式的日志數(shù)據(jù),包括 avro、thrift、exec、jms、spooling directory、netcat、sequence、generator、syslog、http、legacy。官方提供的 source 類型已經(jīng)很多,但是有時候并不能滿足實際開發(fā)當中的需求,此時我們就需要根據(jù)實際需求自定義某些 source。
官方也提供了自定義 source 的接口: https://flume.apache.org/FlumeDeveloperGuide.html#source 根據(jù)官方說明自定義 MySource 需要繼承 AbstractSource 類并實現(xiàn) Configurable 和 PollableSource 接口。
實現(xiàn)相應(yīng)方法:
  • getBackOffSleepIncrement() //backoff 步長,當從數(shù)據(jù)源拉取數(shù)據(jù)時,拉取不到數(shù)據(jù)的話它不會一直再去拉取,而是等待,之后每一次再=如果還拉取不到,就會比上一次多等待步長單位個時間。
  • getMaxBackOffSleepInterval()? //backoff 最長時間,如果不設(shè)置最長等待時間,它最終會無限等待,所以需要指定。
  • configure(Context context)? //初始化 context(讀取配置文件內(nèi)容)
  • process()? //獲取數(shù)據(jù)封裝成 event 并寫入 channel,這個方法將被循環(huán)調(diào)用。
使用場景:讀取 MySQL 數(shù)據(jù)或者其他文件系統(tǒng)。

2)需求

使用 flume 接收數(shù)據(jù),并給每條數(shù)據(jù)添加前綴,輸出到控制臺。前綴可從 flume 配置文
件中配置。

3)分析

4)需求實現(xiàn)

代碼

package com.lyh.source;import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;import java.util.HashMap;
import java.util.Map;public class MySource extends AbstractSource implements Configurable, PollableSource {// 定義配置文件將來要讀取的字段private Long delay;private String field;@Overridepublic Status process() throws EventDeliveryException {try {// 創(chuàng)建事件頭信息Map<String,String> headerMap = new HashMap<>();// 創(chuàng)建事件SimpleEvent event = new SimpleEvent();// 循環(huán)封裝事件for (int i = 0; i < 5; i++) {// 給事件設(shè)置頭信息event.setHeaders(headerMap);// 給事件設(shè)置內(nèi)容event.setBody((field + i).getBytes());// 將事件寫入 channelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}} catch (InterruptedException e) {e.printStackTrace();}return Status.READY;}// 步長@Overridepublic long getBackOffSleepIncrement() {return 0;}// 最大間隔時間@Overridepublic long getMaxBackOffSleepInterval() {return 0;}// 初始化配置信息@Overridepublic void configure(Context context) {delay = context.getLong("delay");field = context.getString("field","Hello");}
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = com.lyh.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = lyh# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/custom-source.conf -Dflume.root.logger=INFO,console

運行結(jié)果:?

7、自定義 Sink

1)介紹

????????Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,并將這些事件批量寫入到存儲或索引系統(tǒng)、或者被發(fā)送到另一個 Flume Agent。
????????Sink 是完全事務(wù)性的。在從 Channel 批量刪除數(shù)據(jù)之前,每個 Sink 用 Channel 啟動一個事務(wù)。批量事件一旦成功寫出到存儲系統(tǒng)或下一個 Flume Agent,Sink 就利用 Channel 提交事務(wù)。事務(wù)一旦被提交,該 Channel 從自己的內(nèi)部緩沖區(qū)刪除事件。
????????Sink 組件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、 自定義。官方提供的 Sink 類型已經(jīng)很多,但是有時候并不能滿足實際開發(fā)當中的需求,此時我們就需要根據(jù)實際需求自定義某些 Sink。
????????官方也提供了自定義 sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根據(jù)官方說明自定義 MySink 需要繼承 AbstractSink 類并實現(xiàn) Configurable 接口。實現(xiàn)相應(yīng)方法:
  • configure(Context context)//初始化 context(讀取配置文件內(nèi)容)
  • process()//從 Channel 讀取獲取數(shù)據(jù)(event),這個方法將被循環(huán)調(diào)用。
使用場景:讀取 Channel 數(shù)據(jù)寫入 MySQL 或者其他文件系統(tǒng)。

2)需求分析

使用 flume 接收數(shù)據(jù),并在 Sink 端給每條數(shù)據(jù)添加前綴和后綴,輸出到控制臺。前后綴可在 flume 任務(wù)配置文件中配置。
流程分析:

?3)需求實現(xiàn)

package com.lyh.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable{private final static Logger LOG = LoggerFactory.getLogger(AbstractSink.class);private String prefix;private String suffix;@Overridepublic Status process() throws EventDeliveryException {// 聲明返回值狀態(tài)信息Status status;// 獲取當前 sink 綁定的 channelChannel channel = getChannel();// 獲取事務(wù)Transaction txn = channel.getTransaction();// 聲明事件Event event;// 開啟事務(wù)txn.begin();// 讀取 channel 中的事件、直到讀取事件結(jié)束循環(huán)while (true){event = channel.take();if (event!=null) break;}try {// 打印事件LOG.info(prefix + new String(event.getBody()) + suffix);// 事務(wù)提交txn.commit();status = Status.READY;}catch (Exception e){// 遇到異?;貪L事務(wù)txn.rollback();status = Status.BACKOFF;}finally {// 關(guān)閉事務(wù)txn.close();}return null;}// 初始化配置信息@Overridepublic void configure(Context context) {// 帶默認值prefix = context.getString("prefix","hello");// 不帶默認值suffix = context.getString("suffix");}
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
a1.sinks.k1.prefix = lyh:
a1.sinks.k1.suffix = :lyh# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4)測試

bin/flume-ng agent -n a1 -c conf/ -f job/custom-sink.conf -Dflume.root.logger=INFO,console

運行結(jié)果:

總結(jié)

? ? ? ? 自此,flume 的學習基本也完了,這一篇雖然不多但也用了大概3天時間。相比較 kafka、flink,flume 這個框架還是非常簡單的,比如我們自己實現(xiàn)一些 source、sink,都是很簡單的,沒有太多復(fù)雜的理解的東西。

? ? ? ? 總之 flume 這個工具還是多看官網(wǎng)。

http://aloenet.com.cn/news/34030.html

相關(guān)文章:

  • 成都網(wǎng)站建設(shè)網(wǎng)絡(luò)營銷的特點和優(yōu)勢
  • 建設(shè)部投訴網(wǎng)站提高搜索引擎排名
  • jsp做的網(wǎng)站如何查看站長資源平臺
  • 湖北宜昌網(wǎng)絡(luò)科技有限公司優(yōu)化課程設(shè)置
  • 現(xiàn)在最流行的網(wǎng)站推廣方式有哪些谷歌seo是什么
  • 怎樣設(shè)計自己網(wǎng)站域名不要手賤搜這15個關(guān)鍵詞
  • seo網(wǎng)站分析報告百度置頂廣告多少錢
  • 企業(yè)建設(shè)網(wǎng)站項目背景線下宣傳渠道和宣傳方式
  • 用服務(wù)器ip可以做網(wǎng)站嗎百度seo在線優(yōu)化
  • 專業(yè)的設(shè)計網(wǎng)站有哪些中國站免費推廣入口
  • 做網(wǎng)站國家大學科技園鄭州網(wǎng)絡(luò)建設(shè)推廣
  • 哪些網(wǎng)站是java開發(fā)的優(yōu)化關(guān)鍵詞的方法有哪些
  • 沈陽市建設(shè)工程項目管理中心網(wǎng)站優(yōu)化大師官網(wǎng)
  • 做的比較好的手機網(wǎng)站東營網(wǎng)站推廣公司
  • 做技術(shù)網(wǎng)站在背景圖產(chǎn)品推廣步驟
  • 有人說做網(wǎng)站賭上海培訓(xùn)機構(gòu)整頓
  • 星沙網(wǎng)站制作網(wǎng)上宣傳廣告怎么做
  • 揭陽手機網(wǎng)站建設(shè) 今日頭條
  • 做cpa搭建哪個網(wǎng)站比較好永久免費制作網(wǎng)頁
  • 中國有色金屬建設(shè)股份有限公司網(wǎng)站seoheuni
  • 做網(wǎng)站的公司哪家最好廈門最快seo
  • 中拓網(wǎng)絡(luò)科技有限公司北京seo不到首頁不扣費
  • 廣州最新發(fā)布最新百度seo新站優(yōu)化
  • 一臺云服務(wù)器做多個網(wǎng)站惠州seo外包服務(wù)
  • 成都網(wǎng)站建設(shè)名錄海南seo排名優(yōu)化公司
  • crm與scrmseo短視頻網(wǎng)頁入口引流網(wǎng)站
  • linux系統(tǒng)網(wǎng)站空間正規(guī)seo關(guān)鍵詞排名哪家專業(yè)
  • 展示網(wǎng)站動畫怎么做的站長工具seo綜合查詢官網(wǎng)
  • 怎么自己做網(wǎng)站的步驟百度推廣是做什么的
  • 網(wǎng)站刷新新前臺是什么意思2345網(wǎng)址中國最好