做網(wǎng)站那個(gè)平臺(tái)個(gè)人怎么做免費(fèi)百度推廣
目錄
RDD持久化
RDD 的數(shù)據(jù)是過程數(shù)據(jù)
?RDD 緩存
RDD CheckPoint
共享變量
廣播變量
累加器
Spark 內(nèi)核調(diào)度
DAG
DAG 的寬窄依賴和階段劃分?
內(nèi)存迭代計(jì)算?
Spark是怎么做內(nèi)存計(jì)算的? DAG的作用?Stage階段劃分的作用?
Spark為什么比MapReduce快?
Spark并行度?
Spark Shuffle
Hash Shuffle
Sort Shuffle
Spark執(zhí)行流程
RDD持久化
RDD 的數(shù)據(jù)是過程數(shù)據(jù)
RDD之間進(jìn)行相互迭代計(jì)算(Transformation的轉(zhuǎn)換),當(dāng)執(zhí)行開啟后,新RDD的生成,代表老RDD的消失
RDD的數(shù)據(jù)是過程數(shù)據(jù),只在處理的過程中存在,一旦處理完成,就不見了
這個(gè)特性可以最大化的利用資源,老舊RDD沒用了 就從內(nèi)存中清理,給后續(xù)的計(jì)算騰出內(nèi)存空間.
例如下面這個(gè)例子,生成rdd4的時(shí)候,?rdd3已經(jīng)被銷毀了,然后下面rdd5需要調(diào)用rdd3的時(shí)候,只能從rdd->rdd2->rdd3再重新生成一次rdd3
rdd = sc.parallelize([('a',1),('a',3),('a',6),('b',1),('b',2),('c',1)],3)rdd2 = rdd.map(lambda x:(x[0],x[1]+2))rdd3 = rdd2.distinct()rdd4 = rdd3.filter(lambda x:x[1]>5)print(rdd4.collect())# [('a', 8)]rdd5 = rdd3.glom()print(rdd5.collect())# [[('a', 5), ('a', 8)], [('a', 3), ('b', 4)], [('b', 3), ('c', 3)]]
?RDD 緩存
RDD的緩存技術(shù):Spark提供了緩存AP1,可以讓我們通過調(diào)用AP1,將指定的RDD數(shù)據(jù)保留在內(nèi)存或者硬盤上緩存的API
最開始要引入:from pyspark.storagelevel import StorageLevel
緩存技術(shù)可以將過程RDD數(shù)據(jù),持久化保存到內(nèi)存或者硬盤上
但是,這個(gè)保存在設(shè)定上是認(rèn)為不安全的,緩存的數(shù)據(jù)在設(shè)計(jì)上是 認(rèn)為 有丟失風(fēng)險(xiǎn)的
所以,緩存有一個(gè)特點(diǎn)就是: 其保留RDD之間的血緣(依賴)關(guān)系
一旦緩存丟失可以基于血緣關(guān)系的記錄重新計(jì)算這個(gè)RDD的數(shù)據(jù)
緩存如何丟失:
在內(nèi)存中的緩存是不安全的,比如斷電計(jì)算任務(wù)內(nèi)存不足把緩存清理給計(jì)算讓路硬盤中因?yàn)橛脖P損壞也是可能丟失的?
RDD緩存采用的是分散存儲(chǔ),也就是每一個(gè)executor都會(huì)將其處理的部分RDD存放在自己的內(nèi)存或硬盤中?
RDD CheckPoint
CheckPoint技術(shù)也是將RDD的數(shù)據(jù)保存起來,但是它僅支持硬盤存儲(chǔ)
并且:它被設(shè)計(jì)認(rèn)為是安全的,不保留 血緣關(guān)系
checkPoint存儲(chǔ)RDD數(shù)據(jù),是集中收集各個(gè)分區(qū)數(shù)據(jù)進(jìn)行存儲(chǔ)而緩存是分散存儲(chǔ),也就是說先將executor中的數(shù)據(jù)收集起來(比如收集到hdfs),然后再進(jìn)行存儲(chǔ)
緩存和CheckPoint的對(duì)比
- CheckPoint不管分區(qū)數(shù)量多少,風(fēng)險(xiǎn)是一樣的,緩存分區(qū)越多,風(fēng)險(xiǎn)越高
- CheckPoint支持寫入HDFS,緩存不行HDFS是高可靠存儲(chǔ),checkPoint被認(rèn)為是安全的.
- checkPoint不支持內(nèi)存緩存可以緩存如果寫內(nèi)存性能比checkPoint要好一些
- CheckPoint因?yàn)樵O(shè)計(jì)認(rèn)為是安全的,所以不保留血緣關(guān)系,而緩存因?yàn)樵O(shè)計(jì)上認(rèn)為不安全,所以保留
sc.setCheckpointDir("hdfs://node1:8020/checkpoint")#設(shè)置存儲(chǔ)位置rdd = sc.parallelize([('a',1),('a',3),('a',6),('b',1),('b',2),('c',1)],3)rdd2 = rdd.map(lambda x:(x[0],x[1]+2))rdd3 = rdd2.distinct()rdd3.checkpoint()
共享變量
廣播變量
假如有下面一個(gè)場(chǎng)景,需要根據(jù)stu_ifo將score_ifo中的數(shù)字替換為名字,注意這里是本地?cái)?shù)據(jù)stu_ifo與RDD數(shù)據(jù)聯(lián)合處理
stu_ifo = [(1,'lmx'),(2,'lby'),(3,'lxl')]score_ifo = sc.parallelize([(1,'math',100),(2,'english',87),(1,'english',80),(3,'chinese',98),(2, 'chinese', 68),(1, 'chinese', 88)])def func(data):for i in stu_ifo:if data[0] == i[0]:return (i[1],data[1],data[2])get = score_ifo.map(func)print(get.collect())
一般情況下, 如果一個(gè)executor里面有多個(gè)分區(qū)的情況,那么每個(gè)分區(qū)都要向driver申請(qǐng)一份本地?cái)?shù)據(jù),然而由于executor內(nèi)部的數(shù)據(jù)是共享的,這樣就會(huì)多申請(qǐng)了一份stu_ifo
只需要將stu_ifo標(biāo)記為廣播變量,就可以解決這個(gè)問題。只會(huì)向一個(gè)executor發(fā)送一次數(shù)據(jù)
只需要在之前聲明:
stu_if_breadcast = sc.broadcast(stu_ifo)
然后使用的時(shí)候:
stu_if_breadcast.value
使用這個(gè)方法,可以節(jié)省IO次數(shù)以及executor內(nèi)存
?上述代碼變?yōu)?#xff1a;
stu_ifo = [(1,'lmx'),(2,'lby'),(3,'lxl')]stu_if_breadcast = sc.broadcast(stu_ifo)score_ifo = sc.parallelize([(1,'math',100),(2,'english',87),(1,'english',80),(3,'chinese',98),(2, 'chinese', 68),(1, 'chinese', 88)])def func(data):for i in stu_if_breadcast.value:if data[0] == i[0]:return (i[1],data[1],data[2])get = score_ifo.map(func)print(get.collect())
累加器
針對(duì)下面這種場(chǎng)景,我希望每map一次,我的num加1:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)def countt(data):global numnum+=1print(num)return dataprint(rdd.map(countt).collect())print(num)# 1# 2# 3# 4# 5# 1# 2# 3# 4# 5# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]# 0
觀察結(jié)果,由于分區(qū)的情況,在每個(gè)executor內(nèi)num都加到了5,但是最后的num卻還是0
因?yàn)榧?操作只會(huì)發(fā)生在?executor 中,而最后打印的是driver中的num,所以還是0
這里可以使用spark的累加器變量:
num = sc.accumulator(0) #累加器rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)def countt(data):global numnum+=1return datardd.map(countt).collect()print(num)# 10
Spark 內(nèi)核調(diào)度
DAG
DAG:有向無環(huán)圖
在spark中,每一個(gè) action算子都會(huì)將前面的一串rdd依賴鏈條執(zhí)行起來,這些執(zhí)行鏈條其實(shí)就是DAG
有多少個(gè)action算子,就有多少個(gè)執(zhí)行鏈條(JOB),就有多少個(gè)DAG
????????如果一個(gè)代碼中,寫了n個(gè)Action,那么這個(gè)代碼運(yùn)行起來產(chǎn)生n個(gè)JOB,每個(gè)JOB有自己的DAG個(gè)代碼運(yùn)行起來,在Spark中稱之為: Application
spark中數(shù)據(jù)都是分區(qū)的,所以實(shí)際上每一個(gè)job都是帶有分區(qū)關(guān)系的DAG
DAG 的寬窄依賴和階段劃分?
RDD的前后關(guān)系分為 寬依賴和窄依賴
窄依賴:父RDD的一個(gè)分區(qū),全部 將數(shù)據(jù)發(fā)給子RDD的一個(gè)分區(qū)
寬依賴:父RDD的一個(gè)分區(qū)將數(shù)據(jù)發(fā)給子RDD的多個(gè)分區(qū)
寬依賴還有一個(gè)別名: shuffle
對(duì)于Spark來說會(huì)根據(jù)DAG,按照寬依賴劃分不同的DAG階段
劃分依據(jù):從后向前,遇到寬依賴 就劃分出一個(gè)階段稱之為stage?
?可以看到,在階段內(nèi)都是 窄依賴,這有助于構(gòu)建內(nèi)存迭代管道
內(nèi)存迭代計(jì)算?
????????在執(zhí)行上圖的程序時(shí),最優(yōu)的方式肯定是task1,2,3,4,5,6都在獨(dú)立且單獨(dú)的線程中完成(還存在另外一種情況,比如b1->p1,p1->下一個(gè)p1是不同的線程,那可能會(huì)在線程中存在網(wǎng)絡(luò)IO調(diào)用,影響性能)
????????task1 中rdd1rdd2 rdd3 的選代計(jì)算,都是由一個(gè)task(線程完成),這一階段的這一條線,是純內(nèi)存計(jì)算.如上圖,task1 task2 task3,就形成了三個(gè)并行的內(nèi)存計(jì)算管道?
Spark默認(rèn)受到全局并行度的限制,除了個(gè)別算子有特殊分區(qū)情況,大部分的算子,都會(huì)遵循全局并行度的要求,來規(guī)劃自己的分區(qū)數(shù)如果全局并行度是3,其實(shí)大部分算子分區(qū)都是3
注意:Spark一般推薦只設(shè)置全局并行度,不要再算子上設(shè)置并行度除了一些排序算子外,計(jì)算算子就讓他默認(rèn)開分區(qū)就可以了.
Spark是怎么做內(nèi)存計(jì)算的? DAG的作用?Stage階段劃分的作用?
- Spark會(huì)產(chǎn)生DAG圖
- DAG圖會(huì)基于分區(qū)和寬窄依賴關(guān)系劃分階段
- 一個(gè)階段的內(nèi)部都是 窄依賴,窄依賴內(nèi),如果形成前后1:1的分區(qū)對(duì)應(yīng)關(guān)系就可以產(chǎn)生許多內(nèi)存選代計(jì)算的管道
- 這些內(nèi)存迭代計(jì)算的管道,就是一個(gè)個(gè)具體的執(zhí)行Task
- 一個(gè)Task是一個(gè)具體的線程,任務(wù)跑在一個(gè)線程內(nèi),就是走內(nèi)存計(jì)算了?
Spark為什么比MapReduce快?
- Spark的算子豐富,MapReduce算子匱乏(Map和Reduce),MapReduce這個(gè)編程模型,很難在一套MR中處理復(fù)雜的任務(wù).很多的復(fù)雜任務(wù),是需要寫多個(gè)MapReduce進(jìn)行串聯(lián)多個(gè)MR串聯(lián)通過磁盤交互數(shù)據(jù)
- Spark可以執(zhí)行內(nèi)存迭代,算子之間形成DAG 基于依賴劃分階段后,在階段內(nèi)形成內(nèi)存選代管道.但是MapReduce的Map和Reduce之間的交互依舊是通過硬盤來交互的
Spark并行度?
Spark的并行: 在同一時(shí)間內(nèi), 有多少個(gè)task在同時(shí)運(yùn)行
并行度:并行能力的設(shè)置
比如設(shè)置并行度6,其實(shí)就是要6個(gè)task并行在跑在有了6個(gè)task并行的前提下,rdd的分區(qū)就被規(guī)劃成6個(gè)分區(qū)了
優(yōu)先級(jí)從高到低:
代碼中
客戶端提交參數(shù)中
配置文件中
默認(rèn)(1,但是不會(huì)全部以1來跑,多數(shù)時(shí)候基于讀取文件的分片數(shù)量來作為默認(rèn)并行度)
?集群如何規(guī)劃并行度?
結(jié)論:設(shè)置為CPU總核心的2~10倍
????????為什么要設(shè)置最少2倍?
????????CPU的一個(gè)核心同一時(shí)間只能干一件事情所以,在100個(gè)核心的情況下,設(shè)置100個(gè)并行,就能讓CPU 100%出力????????這種設(shè)置下如果task的壓力不均衡,某個(gè)task先執(zhí)行完了就導(dǎo)致某個(gè)CPU核心空閑
????????所以,我們將Task(并行)分配的數(shù)量變多,比如800個(gè)并行,同一時(shí)間只有100個(gè)在運(yùn)行,700個(gè)在等待.但是可以確保某個(gè)task運(yùn)行完了.后續(xù)有task補(bǔ)上,不讓cpu閑下來,最大程度利用集群的資源?
Spark Shuffle
Spark在DAG調(diào)度階段會(huì)將一個(gè)Job劃分為多個(gè)Stage,上游Stage做map工作,下游Stage做reduce工作,其本質(zhì)上 還是MapReduce計(jì)算框架。Shuffle是連接map和reduce之間的橋梁,它將map的輸出對(duì)應(yīng)到reduce輸入中,涉及 到序列化反序列化、跨節(jié)點(diǎn)網(wǎng)絡(luò)IO以及磁盤讀寫IO等。
?Spark的Shuffle分為Write和Read兩個(gè)階段,分屬于兩個(gè)不同的Stage,前者是Parent Stage的最后一步,后者是 Child Stage的第一步。
在Spark的中,負(fù)責(zé)shuffle過程的執(zhí)行、計(jì)算和處理的組件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨著 Spark的發(fā)展有兩種實(shí)現(xiàn)的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。?
Hash Shuffle
未經(jīng)優(yōu)化的hashShuffleManager:?
????????HashShuffle是根據(jù)task的計(jì)算結(jié)果的key值的hashcode%ReduceTask來決定放入哪一個(gè)區(qū)分,這樣保證相同的數(shù)據(jù) 一定放入一個(gè)分區(qū),根據(jù)下游的task決定生成幾個(gè)文件,先生成緩沖區(qū)文件在寫入磁盤文件,再將block文件進(jìn)行合并。
????????一個(gè)task內(nèi)部會(huì)根據(jù)hash分類,然后將不同類的數(shù)據(jù)放入不同的磁盤文件,未經(jīng)優(yōu)化的shuffle write操作所產(chǎn)生的磁盤文件的數(shù)量是極其驚人的,也就是下圖所產(chǎn)生的的block file
?優(yōu)化的hashShuffleManager:
????????在shuffle write過程中,task就不是為下游stage的每個(gè)task創(chuàng)建一個(gè)磁盤文件了。此時(shí)會(huì)出現(xiàn)shuffleFileGroup的概 念,每個(gè)shuffleFileGroup會(huì)對(duì)應(yīng)一批磁盤文件,每一個(gè)Group磁盤文件的數(shù)量與下游stage的task數(shù)量是相同的。
????????其實(shí)說到底就是其在executor內(nèi)部就會(huì)進(jìn)行數(shù)據(jù)的匯合操作,大大減少了磁盤文件的生成
Sort Shuffle
(1)該模式下,數(shù)據(jù)會(huì)先寫入一個(gè)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中(默認(rèn)5M),此時(shí)根據(jù)不同的shuffle算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。如果是reduceByKey這種聚合類的shuffle算子,那么會(huì)選用Map數(shù)據(jù)結(jié)構(gòu),一邊通過Map進(jìn)行聚合,一邊寫入內(nèi) 存;如果是join這種普通的shuffle算子,那么會(huì)選用Array數(shù)據(jù)結(jié)構(gòu),直接寫入內(nèi)存。
(2)接著,每寫一條數(shù)據(jù)進(jìn)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會(huì)判斷一下,是否達(dá)到了某個(gè)臨界閾值。如果達(dá)到臨界閾值的話 ,那么就會(huì)嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫到磁盤,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。
(3)排序 在溢寫到磁盤文件之前,會(huì)先根據(jù)key對(duì)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進(jìn)行排序。 (4)溢寫 排序過后,會(huì)分批將數(shù)據(jù)寫入磁盤文件。默認(rèn)的batch數(shù)量是10000條,也就是說,排序好的數(shù)據(jù),會(huì)以每批1萬條數(shù) 據(jù)的形式分批寫入磁盤文件。
(5)merge 一個(gè)task將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中,會(huì)發(fā)生多次磁盤溢寫操作,也就會(huì)產(chǎn)生多個(gè)臨時(shí)文件。最后會(huì)將之 前所有的臨時(shí)磁盤文件都進(jìn)行合并成1個(gè)磁盤文件,這就是merge過程。 由于一個(gè)task就只對(duì)應(yīng)一個(gè)磁盤文件,也就意味著該task為Reduce端的stage的task準(zhǔn)備的數(shù)據(jù)都在這一個(gè)文件中, 因此還會(huì)單獨(dú)寫一份索引文件,其中標(biāo)識(shí)了下游各個(gè)task的數(shù)據(jù)在文件中的start offset與end offset。
說到底最大的區(qū)別就是,一個(gè)task只會(huì)生成一個(gè)磁盤文件和一個(gè)索引文件,大大降低了磁盤占有和網(wǎng)絡(luò)IO數(shù)量
Sort Shuffle bypass機(jī)制
bypass運(yùn)行機(jī)制的觸發(fā)條件如下:
1)shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold=200參數(shù)的值。
2)不是map combine聚合的shuffle算子(比如reduceByKey有map combie)(這種算子不需要排序)
????????此時(shí)task會(huì)為每個(gè)reduce端的task都創(chuàng)建一個(gè)臨時(shí)磁盤文件,并將數(shù)據(jù)按key進(jìn)行hash,然后根據(jù)key的hash值, 將key寫入對(duì)應(yīng)的磁盤文件之中。當(dāng)然,寫入磁盤文件時(shí)也是先寫入內(nèi)存緩沖,緩沖寫滿之后再溢寫到磁盤文件的 。最后,同樣會(huì)將所有臨時(shí)磁盤文件都合并成一個(gè)磁盤文件,并創(chuàng)建一個(gè)單獨(dú)的索引文件。
????????該過程的磁盤寫機(jī)制其實(shí)跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的,因?yàn)槎家獎(jiǎng)?chuàng)建數(shù)量驚人的磁盤文件, 只是在最后會(huì)做一個(gè)磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機(jī)制相對(duì)未經(jīng)優(yōu)化的 HashShuffleManager來說,shuffle read的性能會(huì)更好。
而該機(jī)制與普通SortShuffleManager運(yùn)行機(jī)制的不同在于:
第一,磁盤寫機(jī)制不同;
第二,不會(huì)進(jìn)行排序。也就是說,啟用該機(jī)制的最大好處在于,shuffle write過程中,不需要進(jìn)行數(shù)據(jù)的排序操作, 也就節(jié)省掉了這部分的性能開銷。
Spark執(zhí)行流程
在大方向上:
- 提交代碼
- 生成Driver ,DAG Scheduler規(guī)劃邏輯任務(wù)
- 生成Executor(被Driver生成)
- Driver內(nèi)TaskScheduler去監(jiān)控整個(gè)Spark程序的執(zhí)行
在細(xì)節(jié)上,以YARN為例:
啟動(dòng)ApplicationMaster
AM啟動(dòng)Driver
- Driver構(gòu)建DAG調(diào)度器規(guī)劃任務(wù)
- Driver和AM通訊.AM得知要多少容器去申請(qǐng)
- Driver在申請(qǐng)的容器內(nèi)部啟動(dòng)Executor
- Driver內(nèi)的Task調(diào)度器,調(diào)度任務(wù)執(zhí)行?