做電影網(wǎng)站涉及的侵權(quán)問(wèn)題網(wǎng)盤搜索神器
📖 前言:MapReduce是一種分布式并行編程模型,是Hadoop核心子項(xiàng)目之一。實(shí)驗(yàn)前需確保搭建好Hadoop 3.3.5環(huán)境、安裝好Eclipse IDE
🔎 【Hadoop大數(shù)據(jù)技術(shù)】——Hadoop概述與搭建環(huán)境(學(xué)習(xí)筆記)
目錄
- 🕒 1. 在Eclipse中搭建MapReduce環(huán)境
- 🕒 2. 倒排索引
- 🕘 2.1 案例分析
- 🕤 2.1.1 Map階段
- 🕤 2.1.2 Combine階段
- 🕤 2.1.3 Reduce階段
- 🕘 2.2 案例實(shí)現(xiàn)
- 🕤 2.2.1 Map階段實(shí)現(xiàn)
- 🕤 2.2.2 Combine階段實(shí)現(xiàn)
- 🕤 2.2.3 Reduce階段實(shí)現(xiàn)
- 🕤 2.2.4 Runner程序主類實(shí)現(xiàn)
- 🕒 3. 數(shù)據(jù)去重
- 🕘 3.1 案例分析
- 🕤 3.1.1 Map階段
- 🕤 3.1.2 Reduce階段
- 🕘 3.2 案例實(shí)現(xiàn)
- 🕤 3.2.1 Map階段實(shí)現(xiàn)
- 🕤 3.2.2 Reduce階段實(shí)現(xiàn)
- 🕤 3.2.3 Runner程序主類實(shí)現(xiàn)
- 🕒 4. TopN
- 🕘 4.1 案例分析
- 🕘 4.2 案例實(shí)現(xiàn)
- 🕤 4.2.1 Map階段實(shí)現(xiàn)
- 🕤 4.2.2 Reduce階段實(shí)現(xiàn)
- 🕤 4.2.3 Runner程序主類實(shí)現(xiàn)
🕒 1. 在Eclipse中搭建MapReduce環(huán)境
要在 Eclipse 上編譯和運(yùn)行 MapReduce 程序,需要安裝 hadoop-eclipse-plugin
下載后,將插件復(fù)制到 Eclipse 安裝目錄的 plugins 文件夾中
🔎 點(diǎn)擊獲取軟件 提取碼: 09oy
sudo mv hadoop-eclipse-plugin-2.7.3.jar /opt/eclipse/plugins/
之后重啟eclipse完成插件導(dǎo)入。
在繼續(xù)配置前請(qǐng)確保已經(jīng)開(kāi)啟了 Hadoop
hadoop@Hins-vm:/usr/local/hadoop$ ./sbin/start-dfs.sh
插件需要進(jìn)一步的配置。
第一步:選擇 Window 菜單下的 Preference。
此時(shí)會(huì)彈出一個(gè)窗口,窗口的左側(cè)會(huì)多出 Hadoop Map/Reduce 選項(xiàng),點(diǎn)擊此選項(xiàng),選擇 Hadoop 的安裝目錄(如//usr/local/hadoop)。
第二步:切換 Map/Reduce 開(kāi)發(fā)視圖,選擇 Window 菜單下選擇 Window -> Perspective -> Open Perspective -> Other,彈出一個(gè)窗口,從中選擇 Map/Reduce 選項(xiàng)即可進(jìn)行切換。
第三步:建立與 Hadoop 集群的連接,點(diǎn)擊 Eclipse軟件右下角的 Map/Reduce Locations 面板,在面板中單擊右鍵,選擇 New Hadoop Location。
在彈出來(lái)的 General 選項(xiàng)面板中,General 的設(shè)置要與 Hadoop 的配置一致。一般兩個(gè) Host 值是一樣的,如果是偽分布式,填寫 localhost 即可,本文使用Hadoop偽分布式配置,設(shè)置 fs.defaultFS 為 hdfs://localhost:9000,則 DFS Master 的 Port 要改為 9000。Map/Reduce(V2) Master 的 Port 用默認(rèn)的即可,Location Name 隨意填寫。
點(diǎn)擊 finish,Map/Reduce Location 就創(chuàng)建好了。
在 Eclipse 中操作 HDFS 中的文件:
配置好后,點(diǎn)擊左側(cè) Project Explorer 中的 MapReduce Location 就能直接查看 HDFS 中的文件列表了,雙擊可以查看內(nèi)容,右鍵點(diǎn)擊可以上傳、下載、刪除 HDFS 中的文件,無(wú)需再通過(guò)繁瑣的 hdfs dfs -ls 等命令進(jìn)行操作了。
注:HDFS 中的內(nèi)容變動(dòng)后,Eclipse 不會(huì)同步刷新,需要右鍵點(diǎn)擊 Project Explorer中的 MapReduce Location,選擇 Refresh,才能看到變動(dòng)后的文件。
🕒 2. 倒排索引
倒排索引是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛應(yīng)用于全文搜索引擎。倒排索引主要用來(lái)存儲(chǔ)某個(gè)單詞或詞組在一組文檔中的存儲(chǔ)位置的映射,提供了可以根據(jù)內(nèi)容來(lái)查找文檔的方式,而不是根據(jù)文檔來(lái)確定內(nèi)容,因此稱為倒排索引(Inverted Index)。帶有倒排索引的文件我們稱為倒排索引文件,簡(jiǎn)稱倒排文件(Inverted File)。
倒排文件由一個(gè)單詞或詞組和相關(guān)聯(lián)的文檔列表組成。
在實(shí)際應(yīng)用中,還需要給每個(gè)文檔添加一個(gè)權(quán)值,用來(lái)指出每個(gè)文檔與搜索內(nèi)容的相關(guān)度。最常用的是使用詞頻作為權(quán)重,即記錄單詞或詞組在文檔中出現(xiàn)的次數(shù),用戶在搜索相關(guān)文檔時(shí),就會(huì)把權(quán)重高的推薦給客戶。
🕘 2.1 案例分析
現(xiàn)有三個(gè)源文件file1.txt、file2.txt和file3.txt,需要使用倒排索引的方式對(duì)這三個(gè)源文件內(nèi)容實(shí)現(xiàn)倒排索引,并將最后的倒排索引文件輸出。
file1.txt
MapReduce is simple
file2.txt
MapReduce is powerful is simple
file3.txt
Hello MapReduce bye MapReduce
使用實(shí)現(xiàn)倒排索引的MapReduce程序統(tǒng)計(jì)文件file1.txt、file2.txt和file3.txt中每個(gè)單詞所在文本的位置以及各文本中出現(xiàn)的次數(shù)。
🕤 2.1.1 Map階段
MapTask使用默認(rèn)的lnputFormat組件對(duì)每個(gè)文本文件進(jìn)行處理,得到文本中的每行數(shù)據(jù)的起始偏移量及其內(nèi)容,作為Map階段輸入的鍵值對(duì),進(jìn)一步得到倒排索引中需要的3個(gè)信息:單詞、文檔名稱和詞頻。
🕤 2.1.2 Combine階段
經(jīng)過(guò)Map階段數(shù)據(jù)轉(zhuǎn)換后,同一個(gè)文檔中相同的單詞會(huì)出現(xiàn)多個(gè)的情況,單純依靠后續(xù)ReduceTask同時(shí)完成詞頻統(tǒng)計(jì)和生成文檔列表會(huì)耗費(fèi)大量時(shí)間,因此可以通過(guò)Combiner組件先完成每一個(gè)文檔中的詞頻統(tǒng)計(jì)。
🕤 2.1.3 Reduce階段
經(jīng)過(guò)上述兩個(gè)階段的處理后,Reduce階段只需將所有文件中相同key值的value值進(jìn)行統(tǒng)計(jì),并組合成倒排索引文件所需的格式即可。
🕘 2.2 案例實(shí)現(xiàn)
首先,我們創(chuàng)建好這些源文件,設(shè)置好路徑并上傳至HDFS的input中。
在 Eclipse 中創(chuàng)建項(xiàng)目,點(diǎn)擊 File 菜單,選擇 New -> Project,選擇 Map/Reduce Project,點(diǎn)擊 Next。
取名MapReduceDemo
,點(diǎn)擊 Finish。
此時(shí)在左側(cè)的 Project Explorer 就能看到剛才建立的項(xiàng)目了。接著右鍵點(diǎn)擊剛創(chuàng)建的 MapReduce 項(xiàng)目 src,選擇 New -> Package,在 Package 處填寫 com.mapreduce.invertedindex
;
🕤 2.2.1 Map階段實(shí)現(xiàn)
在 com.mapreduce.invertedindex
包下新建自定義類Mapper類InvertedIndexMapper
,該類繼承Mapper類
該類的作用:將文本中的單詞按照空格進(jìn)行切割,并以冒號(hào)拼接,“單詞:文檔名稱”作為key,單詞次數(shù)作為value,都以文本方式傳輸至Combine階段。
package com.mapreduce.invertedindex;import java.io.IOException;import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {private static Text keyInfo = new Text();// 存儲(chǔ)單詞和URL組合private static final Text valueInfo = new Text("1");// 存儲(chǔ)詞頻,初始化為1// 重寫map()方法,將文本中的單詞進(jìn)行切割,并通過(guò)write()將map()生成的鍵值對(duì)輸出給Combine階段。@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, " ");// 得到字段數(shù)組FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到這行數(shù)據(jù)所在的文件切片String fileName = fileSplit.getPath().getName();// 根據(jù)文件切片得到文件名for (String field : fields) {// key值由單詞和URL組成,如"MapReduce:file1"keyInfo.set(field + ":" + fileName);context.write(keyInfo, valueInfo);}}
}
🕤 2.2.2 Combine階段實(shí)現(xiàn)
根據(jù)Map階段的輸出結(jié)果形式,在 com.mapreduce.invertedindex
包下,自定義實(shí)現(xiàn)Combine階段的類InvertedIndexCombiner
,該類繼承Reducer類,對(duì)每個(gè)文檔的單詞進(jìn)行詞頻統(tǒng)計(jì),如下圖所示。
該類作用:對(duì)Map階段的單詞次數(shù)聚合處理,并重新設(shè)置key值為單詞,value值由文檔名稱和詞頻組成。
package com.mapreduce.invertedindex;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {private static Text info = new Text();// 輸入: <MapReduce:file3 {1,1..>// 輸出: <MapReduce file3:2>// 重寫reduce()方法對(duì)Map階段的單詞次數(shù)聚合處理。@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int sum = 0;// 統(tǒng)計(jì)詞頻for (Text value : values) {sum += Integer.parseInt(value.toString());}int splitIndex = key.toString().indexOf(":");// 重新設(shè)置value值由URL和詞頻組成info.set(key.toString().substring(splitIndex + 1) + ":" + sum);// 重新設(shè)置key值為單詞key.set(key.toString().substring(0, splitIndex));context.write(key, info);}
}
🕤 2.2.3 Reduce階段實(shí)現(xiàn)
根據(jù)Combine階段的輸出結(jié)果形式,在同一包下,自定義實(shí)現(xiàn)Reducer類InvertedIndexReducer
,該類繼承Reducer。
該類作用:接收Combine階段輸出的數(shù)據(jù),按照最終案例倒排索引文件需求的樣式,將單詞作為key,多個(gè)文檔名稱和詞頻連接作為value,輸出到目標(biāo)目錄。
package com.mapreduce.invertedindex;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {private static Text result = new Text();// 輸入: <MapReduce file3:2>// 輸出: <MapReduce file1:1;file2:1;file3:2;>@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// 生成文檔列表String fileList = new String();for (Text value : values) {fileList += value.toString() + ";";}result.set(fileList);context.write(key, result);}
}
🕤 2.2.4 Runner程序主類實(shí)現(xiàn)
在同一個(gè)包下編寫MapReduce程序運(yùn)行主類InvertedIndexDriver
。
該類作用:設(shè)置MapReduce工作任務(wù)的相關(guān)參數(shù),設(shè)置完畢,運(yùn)行主程序即可。
package com.mapreduce.invertedindex;import java.io.IOException;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.service.api.records.Configuration;public class InvertedIndexDriver {public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(InvertedIndexDriver.class);job.setMapperClass(InvertedIndexMapper.class);job.setCombinerClass(InvertedIndexCombiner.class);job.setReducerClass(InvertedIndexReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/MapReduce/InvertedIndex/input"));// 指定處理完成之后的結(jié)果所保存的位置FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/MapReduce/InvertedIndex/output"));// 向yarn集群提交這個(gè)jobboolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}
注:運(yùn)行結(jié)果處的報(bào)錯(cuò)可以無(wú)視。
Web UI查看:
終端查看:
Eclipse IDE查看:
MapReduce的程序可以用Eclipse編譯運(yùn)行或使用命令行編譯打包運(yùn)行,下面是用命令行編譯打包運(yùn)行的方法:
將驅(qū)動(dòng)類代碼修改一下:
public class InvertedIndexDriver {public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {......FileInputFormat.setInputPaths(job, new Path(args[0]));// 指定處理完成之后的結(jié)果所保存的位置FileOutputFormat.setOutputPath(job, new Path(args[1]));// 向yarn集群提交這個(gè)jobboolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}
運(yùn)行前如有output文件夾,需要先刪了:
hadoop@Hins-vm:/usr/local/hadoop$ ./bin/hdfs dfs -rm -r /user/hadoop/MapReduce/InvertedIndex/output
打包為jar文件的操作詳見(jiàn)HDFS分布式文件系統(tǒng)的 4.6節(jié)
🔎 傳送門:HDFS分布式文件系統(tǒng)
現(xiàn)在,就可以在Linux系統(tǒng)中,使用hadoop jar命令運(yùn)行程序,并到HDFS中查看生成的文件:
hadoop@Hins-vm:/usr/local/hadoop$ ./bin/hadoop jar ./myapp/InvertedIndex.jar ./MapReduce/InvertedIndex/input ./MapReduce/InvertedIndex/output
🕒 3. 數(shù)據(jù)去重
數(shù)據(jù)去重主要是為了掌握利用并行化思想來(lái)對(duì)數(shù)據(jù)進(jìn)行有意義的篩選,數(shù)據(jù)去重指去除重復(fù)數(shù)據(jù)的操作。在大數(shù)據(jù)開(kāi)發(fā)中,統(tǒng)計(jì)大數(shù)據(jù)集上的多種數(shù)據(jù)指標(biāo),這些復(fù)雜的任務(wù)數(shù)據(jù)都會(huì)涉及數(shù)據(jù)去重。
🕘 3.1 案例分析
現(xiàn)有兩個(gè)源文件file4.txt和file5.txt,內(nèi)容分別如下,編程實(shí)現(xiàn)對(duì)兩個(gè)文件合并后的數(shù)據(jù)內(nèi)容去重:
file4.txt
2022-3-21 a
2022-3-22 b
2022-3-23 c
2022-3-24 d
2022-3-25 a
2022-3-26 b
2022-3-27 c
2022-3-23 c
file5.txt
2022-3-21 b
2022-3-22 a
2022-3-23 b
2022-3-24 d
2022-3-25 a
2022-3-26 c
2022-3-27 d
2022-3-23 c
🕤 3.1.1 Map階段
在Map階段將讀取的每一行數(shù)據(jù)作為鍵,如2022-3-21 a,由于MapReduce程序?qū)?shù)據(jù)去重是以鍵值對(duì)的形式解析數(shù)據(jù),需要將每一行數(shù)據(jù)當(dāng)作整體進(jìn)行去重,所以將每一行數(shù)據(jù)作為鍵,而值在數(shù)據(jù)去重中作用不大,這里將值設(shè)置為null滿足<Key,Value>的格式。
🕤 3.1.2 Reduce階段
在Reduce階段,將MapTask輸出的鍵值對(duì)作為Reduce階段輸入的鍵值對(duì),通過(guò)ReduceTask中的Shuffle對(duì)同一分區(qū)中鍵相同鍵值對(duì)合并,達(dá)到數(shù)據(jù)去重的效果。
🕘 3.2 案例實(shí)現(xiàn)
首先,我們創(chuàng)建好這些源文件,設(shè)置好路徑并上傳至HDFS的input中。
在MapReduceDemo
項(xiàng)目下新建包 com.mapreduce.dedup
🕤 3.2.1 Map階段實(shí)現(xiàn)
在 com.mapreduce.dedup
包下新建自定義類Mapper類DedupMapper
,該類繼承Mapper類
該類作用:讀取數(shù)據(jù)集文件將TextInputFormat默認(rèn)組件解析的類似<0,2022-3-21 a>鍵值對(duì)修改為<2022-3-21 a,null>
package com.mapreduce.dedup;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {private static Text field = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {field = value;context.write(field, NullWritable.get());}
}
🕤 3.2.2 Reduce階段實(shí)現(xiàn)
在同一包下新建自定義類Reducer類DedupReducer
,該類繼承Reducer類
該類作用:僅接受Map階段傳遞過(guò)來(lái)的數(shù)據(jù),根據(jù)Shuffle工作原理,鍵值key相同的數(shù)據(jù)就會(huì)被合并,因此輸出的數(shù)據(jù)就不會(huì)出現(xiàn)重復(fù)數(shù)據(jù)了。
package com.mapreduce.dedup;import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}
🕤 3.2.3 Runner程序主類實(shí)現(xiàn)
在同一個(gè)包下編寫MapReduce程序運(yùn)行主類DedupRunner
。
package com.mapreduce.dedup;import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.service.api.records.Configuration;public class DedupRunner {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(DedupRunner.class);job.setMapperClass(DedupMapper.class);job.setReducerClass(DedupReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/MapReduce/Dedup/input"));// 指定處理完成之后的結(jié)果所保存的位置FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/MapReduce/Dedup/output"));job.waitForCompletion(true);}
}
🕒 4. TopN
TopN分析法是指從研究對(duì)象中按照某一個(gè)指標(biāo)進(jìn)行倒序或正序排列,取其中最大的N個(gè)數(shù)據(jù),并對(duì)這N個(gè)數(shù)據(jù)以倒序或正序的方式進(jìn)行輸出分析的方法。
🕘 4.1 案例分析
假設(shè)有數(shù)據(jù)文件num.txt,要求以降序的方式獲取文件內(nèi)容中最大的5個(gè)數(shù)據(jù),并將這5個(gè)數(shù)據(jù)保存到一個(gè)文件中。
10 3 8 7 6 5 1 2 9 4
11 12 17 14 15 20
19 18 13 16
(1)在Map階段,可以使用TreeMap數(shù)據(jù)結(jié)構(gòu)保存TopN的數(shù)據(jù),TreeMap是一個(gè)有序的鍵值對(duì)集合,默認(rèn)會(huì)根據(jù)鍵進(jìn)行排序,也可以自行設(shè)定排序規(guī)則,TreeMap中的firstKey()可以用于返回當(dāng)前集合最小值的鍵。
(2)在Reduce階段,將MapTask輸出的數(shù)據(jù)進(jìn)行匯總,選出其中的最大的5個(gè)數(shù)據(jù)即可滿足需求。
(3)要想提取文本中5個(gè)最大的數(shù)據(jù)并保存到一個(gè)文件中,需要將ReduceTask的數(shù)量設(shè)置為1,這樣才不會(huì)把文件中的數(shù)據(jù)分發(fā)給不同的ReduceTask處理。
🕘 4.2 案例實(shí)現(xiàn)
首先,我們創(chuàng)建好這些源文件,設(shè)置好路徑并上傳至HDFS的input中。
在MapReduceDemo
項(xiàng)目下新建包 com.mapreduce.topn
🕤 4.2.1 Map階段實(shí)現(xiàn)
在 com.mapreduce.topn
包下新建自定義類Mapper類TopNMapper
,該類繼承Mapper類
該類作用:先將文件中的每行數(shù)據(jù)進(jìn)行切割提取,并把數(shù)據(jù)保存到TreeMap中,判斷TreeMap是否大于5,如果大于5就需要移除最小的數(shù)據(jù)。由于數(shù)據(jù)是逐行讀取,如果這時(shí)就向外寫數(shù)據(jù),那么TreeMap就保存了每一行的最大5個(gè)數(shù),因此需要在cleanup()方法中編寫context.write()方法,這樣就保證了當(dāng)前MapTask中TreeMap保存了當(dāng)前文件最大的5條數(shù)據(jù)后,再輸出到Reduce階段。
package com.mapreduce.topn;import java.util.TreeMap;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class TopNMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable> {private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();@Overridepublic void map(LongWritable key, Text value, Context context) {String line = value.toString();String[] nums = line.split(" ");for (String num : nums) {repToRecordMap.put(Integer.parseInt(num), " ");if (repToRecordMap.size() > 5) {repToRecordMap.remove(repToRecordMap.firstKey());}}}@Overrideprotected void cleanup(Context context) {for (Integer i : repToRecordMap.keySet()) {try {context.write(NullWritable.get(), new IntWritable(i));} catch (Exception e) {e.printStackTrace();}}}
}
🕤 4.2.2 Reduce階段實(shí)現(xiàn)
在同一包下新建自定義類Reducer類TopNReducer
,該類繼承Reducer類
該類作用:首先TreeMap自定義排序規(guī)則,當(dāng)需求取最大值時(shí),只需要在compare()方法中返回正數(shù)即可滿足倒序排序,reduce()方法依然要滿足時(shí)刻判斷TreeMap中存放數(shù)據(jù)是前5個(gè)數(shù),并最終遍歷輸出最大的5個(gè)數(shù)。
package com.mapreduce.topn;import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;public class TopNReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(new Comparator<Integer>() {// 返回一個(gè)基本類型的整型,誰(shuí)大誰(shuí)排后面.// 返回負(fù)數(shù)表示:01小于02// 返回0表示:表示: 01和02相等// 返回正數(shù)表示: 01大于02。public int compare(Integer a, Integer b) {return b - a;}});public void reduce(NullWritable key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {for (IntWritable value : values) {repToRecordMap.put(value.get(), " ");if (repToRecordMap.size() > 5) {repToRecordMap.remove(repToRecordMap.firstKey());}}for (Integer i : repToRecordMap.keySet()) {context.write(NullWritable.get(), new IntWritable(i));}}
}
🕤 4.2.3 Runner程序主類實(shí)現(xiàn)
在同一個(gè)包下編寫MapReduce程序運(yùn)行主類TopNRunner
。
package com.mapreduce.topn;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.service.api.records.Configuration;public class TopNRunner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(TopNRunner.class);job.setMapperClass(TopNMapper.class);job.setReducerClass(TopNReducer.class);job.setNumReduceTasks(1);job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/user/hadoop/MapReduce/TopN/input"));FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/MapReduce/TopN/output"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}
? 轉(zhuǎn)載請(qǐng)注明出處
作者:HinsCoder
博客鏈接:🔎 作者博客主頁(yè)