學校網(wǎng)站建設主體怎么學做電商然后自己創(chuàng)業(yè)
(一)需求分析
計算每個大區(qū)當天金幣收入排名前N的主播
背景:
我們有一款直播APP,已經(jīng)在很多國家上線并運營了一段時間,產(chǎn)品經(jīng)理希望開發(fā)一個功能,計算前N主播排行榜,按天更新排名信息,統(tǒng)計的維度有多種,其中有一個維度是針對主播當天直播的金幣收入進行排名。
一個大區(qū)下面包含多個國家,不同大區(qū)的運營策略是不一樣的,所以就把不同國家劃分到不同大區(qū)里面,方便運營。
那這個TopN主播排行榜在統(tǒng)計的時候就需要分大區(qū)統(tǒng)計了。
針對主播每天的開播數(shù)據(jù)我們已經(jīng)有了,以及直播間內(nèi)用戶的送禮記錄也都是有的。那這樣其實就可以統(tǒng)計主播當天的金幣收入了主播一天可能會開播多次,所以后期在統(tǒng)計主播當天收入的時候是需要把他當天所有直播中的金幣收入都計算在內(nèi)的。
分析 :我們有兩份數(shù)據(jù),數(shù)據(jù)都是json格式的
- video_info.log 主播的開播記錄,其中包含主播的id:uid、直播間id:vid 、大區(qū):area、視頻開播時長:length、增加粉絲數(shù)量:follow等信息
- gift_record.log 用戶送禮記錄,其中包含送禮人id:uid,直播間id:vid,禮物id:good_id,金幣數(shù)
量:gold 等信息.
其實就是按照當天主播所有開播的直播間內(nèi)的收入?yún)R總,按大區(qū)分組,統(tǒng)計每個大區(qū)內(nèi)收入排名前N的主播。
(二)開發(fā)步驟
1:首先獲取兩份數(shù)據(jù)中的核心字段,使用fastjson包解析數(shù)據(jù)
主播開播記錄:主播ID:uid,直播間ID:vid,大區(qū):area
(vid,(uid,area))用戶送禮記錄:直播間ID:vid,金幣數(shù)量:gold(vid,gold) 這樣的可以把這兩份數(shù)據(jù)關聯(lián)到一塊就能獲取到大區(qū)、主播、金幣這些信息了,使用直播間vid進行關聯(lián)。
2:對用戶送禮記錄數(shù)據(jù)進行聚合,對相同vid的數(shù)據(jù)求和
因為用戶可能在一次直播中給主播送多次禮物
(vid,gold_sum)
3:把這兩份數(shù)據(jù)join到一塊,vid作為join的key
(vid,((uid,area),gold_sum))
4:使用map迭代join之后的數(shù)據(jù),最后獲取到uid,area,gold_sum字段,由于一個主播一天可能會開播多
次,后面需要基于uid和area再做一次聚合,所以把數(shù)據(jù)轉(zhuǎn)換成這種格式uid和area是一一對應的,一個人只能屬于一個大區(qū)
((uid,area),gold_sum)
5:使用reduceByKey算子對數(shù)據(jù)進行聚合
((uid,area),gold_sum_all)
6:接下來對需要使用groupByKey對數(shù)據(jù)進行分組,所以先使用map進行轉(zhuǎn)換
map:(area,(uid,gold_sum_all))
groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>
7:使用map迭代每個分組內(nèi)的數(shù)據(jù),按金幣數(shù)量倒序排序,取前N個,最終輸出area、topN
這個TopN其實就是把前幾名主播的id還有金幣數(shù)量拼接成一個字符串(area,topN)
8:使用foreach將結(jié)果打印到控制臺,多個字段使用制表符分割area topN
(三)環(huán)境依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
(四)代碼開發(fā)
object TopNScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("TopNScala").setMaster("local")val sc = new SparkContext(conf)//1:首先獲取兩份數(shù)據(jù)中的核心字段,使用fastjson包解析數(shù)據(jù)val videoInfoRDD = sc.textFile("D:\\video_info.log")val giftRecordRDD = sc.textFile("D:\\gift_record.log")//(vid,(uid,area))val videoInfoFieldRDD = videoInfoRDD.map(line=>{val jsonObj = JSON.parseObject(line)val vid = jsonObj.getString("vid")val uid = jsonObj.getString("uid")val area = jsonObj.getString("area")(vid,(uid,area))})//(vid,gold)val giftRecordFieldRDD = giftRecordRDD.map(line=>{val jsonObj = JSON.parseObject(line)val vid = jsonObj.getString("vid")val gold = Integer.parseInt(jsonObj.getString("gold"))(vid,gold)})//2:對用戶送禮記錄數(shù)據(jù)進行聚合,對相同vid的數(shù)據(jù)求和//(vid,gold_sum)val giftRecordFieldAggRDD = giftRecordFieldRDD.reduceByKey(_ + _)//3:把這兩份數(shù)據(jù)join到一塊,vid作為join的key//(vid,((uid,area),gold_sum))val joinRDD = videoInfoFieldRDD.join(giftRecordFieldAggRDD)//4:使用map迭代join之后的數(shù)據(jù),最后獲取到uid、area、gold_sum字段//joinRDD: (vid,((uid,area),gold_sum))val joinMapRDD =joinRDD.map(tup=>{//joinRDD: (vid,((uid,area),gold_sum))//獲取uidval uid = tup._2._1._1//獲取areaval area = tup._2._1._2//獲取gold_sumval gold_sum = tup._2._2((uid,area),gold_sum)})//5:使用reduceByKey算子對數(shù)據(jù)進行聚合//((uid,area),gold_sum_all)val reduceRDD = joinMapRDD.reduceByKey(_ + _)//6:接下來對需要使用groupByKey對數(shù)據(jù)進行分組,所以先使用map進行轉(zhuǎn)換//map:(area,(uid,gold_sum_all))//groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)val groupRDD = reduceRDD.map(tup=>(tup._1._2,(tup._1._1,tup._2))).groupByKey()//7:使用map迭代每個分組內(nèi)的數(shù)據(jù),按照金幣數(shù)量倒序排序,取前N個,最終輸出area,t//(area,topN)val top3RDD = groupRDD.map(tup=>{val area = tup._1//toList:把iterable轉(zhuǎn)成list//sortBy:排序,默認是正序//reverse:反轉(zhuǎn),實現(xiàn)倒序效果//take(3):取前3個元素//mkString:使用指定字符把集合轉(zhuǎn)成字符串//uid:gold_sum_all,uid:gold_sum_all,uid:gold_sum_allval top3 = tup._2.toList.sortBy(_._2).reverse.take(3).map(tup=>tup._1+":"+tup._2).mkString(",")(area,top3)})//8:使用foreach將結(jié)果打印到控制臺,多個字段使用制表符分割top3RDD.foreach(tup=>println(tup._1+"\t"+tup._2))sc.stop()}
}