邢臺哪兒做wap網(wǎng)站好建站軟件
簡介
針對高耗跑批時間長的作業(yè),在公司近3個月做過一個優(yōu)化專項;優(yōu)化成效:綜合cpu、內(nèi)存、跑批耗時減少均在65%以上;
cpu和內(nèi)存消耗指的是:vcoreseconds和memoryseconds
這里簡單說下優(yōu)化的一些思路,至于優(yōu)化細節(jié)之前寫過兩篇,或可參考::
sparksql優(yōu)化:https://blog.csdn.net/me_to_007/article/details/130916946
hive優(yōu)化:https://blog.csdn.net/me_to_007/article/details/126921955
運行環(huán)境:yarn調(diào)度+hdfs存儲(orc格式表)+hive2+spark2.4.3
大家有什么經(jīng)驗,也歡迎補充、指正
什么樣的任務值得關(guān)注
一個作業(yè)運行時間很長就值得關(guān)注嗎?不一定,如果一個用戶把n段邏輯全都整到一個作業(yè)里,不是太長,跑批時耗在數(shù)倉規(guī)范范圍內(nèi),可能也是可接受合理的;
這里將時間界限監(jiān)控到stage層面,對應hive就是每一個job,在sparksql里就是每一個stage;一個作業(yè)跑批時間很長可能是合理的,也許是邏輯復雜由很多個stage構(gòu)成;但是一個stage跑批時間很長,在分布式計算里,一般不正常;
hive的stage可以只有map邏輯,也可以是一段mr、union、limit語句;sparksql的stage則根據(jù)有無shuffle劃分,遇到shuffle寬依賴則新劃分一個stage;
這個時間界限可以根據(jù)經(jīng)驗及要求靈活設置,比如要盡可能地提升作業(yè)跑批時效和節(jié)省結(jié)算資源,那這個界限可以設置小點,比如一個小時;一般幾十億的數(shù)據(jù)join,沒有傾斜、發(fā)散,一個小時也夠了;
另外一方面是監(jiān)控內(nèi)存配置,hive里邊的map和reduce內(nèi)存配置,sparksql里的executor內(nèi)置配置,這些參數(shù)都不應該設置過大;每個task的內(nèi)存界限可以設置到4個g
hive里邊map和reduce內(nèi)存是單獨設置的,sparksql單個task的內(nèi)存約等于:單個executor內(nèi)存/每個executor cpu核數(shù) * 每個task跑批cpu核數(shù)(這個參數(shù)默認是1,一般不做額外設置)
hive-map內(nèi)存參數(shù):mapreduce.map.memory.mb
hive-reduce內(nèi)存參數(shù):mapreduce.reduce.memory.mb
sparksql-executor內(nèi)存參數(shù):spark.executor.memory
sparksql-executor核數(shù):spark.executor.cores
sparksql-每task-cpu核數(shù):spark.task.cpus
該參數(shù)默認為1,一般無需額外設置;
spark-driver的內(nèi)存,如果有廣播和cahe緩存,需要配置大些,可根據(jù)實際情況設置;
配置自動監(jiān)控
這里可以根據(jù)yarn日志,編寫定時調(diào)度腳本,解析作業(yè)日志,將符合異常規(guī)則的作業(yè)發(fā)郵件出來篩查(也可配置短信接口通知作業(yè)屬主整改);
這里可以手工配置一個白名單,作業(yè)如果已經(jīng)審查無太大優(yōu)化空間,則加入白名單避開篩查;當然這個白名單也可以設置特定的規(guī)則,比如把時間和資源界限的上限配大點,而不是無限放大,避免白名單后界限無限放大;
內(nèi)存監(jiān)控則直接讀取作業(yè)配置參數(shù)即可,如觸發(fā)條件則發(fā)郵件到相關(guān)團隊優(yōu)化(在我們這是數(shù)據(jù)治理組)
耗時長有兩種情況,一種的傾斜導致的,這種就解決傾斜;另外一種每個task處理數(shù)據(jù)量都很大,這類情況一般把task數(shù)量調(diào)大點,再多給點資源;日志解析也比較好處理,傾斜監(jiān)控作業(yè)各個stage中最大task耗時相比一般水平差異,這個界限也可以按需配置;比如我們這里設定為最大task跑批時長-上四分位時長 > 3倍四分差
數(shù)據(jù)傾斜是一個相對的概念,準確地說,每個作業(yè)都有傾斜,但對于一些跑批時間比較短的作業(yè),至于整個數(shù)倉盤面來說,優(yōu)化提升不大;這里可以在監(jiān)控里篩跳過;
當然至于個別業(yè)務團隊,視重要性個別對待,有的作業(yè)對時效要求比較高,比如涉及到重大活動數(shù)據(jù),那就做精細化優(yōu)化;
考慮到y(tǒng)arn日志文件比較大,解析耗時,集群作業(yè)比較多,解析日志應有篩選地進行;那些跑批時間相對短的作業(yè)或可暫時擱置;同時配置白名單,防止日志重復讀取解析;
怎么去定位問題SQL段
主要是結(jié)合執(zhí)行計劃定位,在hive里面執(zhí)行計劃map-tree里顯示的表名,如果sql中有寫別名,則顯示的是別名;為防止篩查混亂,難以定位,sql中的表別名盡量不要使用相同的,沒有關(guān)聯(lián)邏輯情況,盡量不要使用別名;
比如下面一段sql:
select group_name,count(1) as cnt
from tablename -- 這里不寫表別名
group by group_name
在hive中,直接使用explain + sql字符串
打印執(zhí)行計劃即可;看看日志里哪個stage異常,再根據(jù)執(zhí)行計劃去定位異常sql段落,進而進行作業(yè)分析;
explain select dt,count(1) as cnt from tablename group by dt
hive中如果有配置join傾斜參數(shù)(set hive.optimize.skewjoin=true
)或者groupby傾斜參數(shù)(set hive.groupby.skewindata=true
),如果作業(yè)stage傾斜會額外生成一個stage,這里在執(zhí)行計劃里是看不到的,這個stage緊跟運行在原來的stage后面;
這里需要注意的是,調(diào)度環(huán)境和我們查看執(zhí)行計劃的環(huán)境,引擎配置可能會不一樣,我們通過執(zhí)行計劃查看的stage號可能跟實際作業(yè)跑批不一致;這里盡量在同一環(huán)境下去查看執(zhí)行計劃;
一方面,我們也可以通過yarn里map日志的syslog日志去查看map讀取的表名,然后根據(jù)使用的表名大概定位到sql段(如果當前stage的數(shù)據(jù)源都是來自前依賴stage reduce的output則日志里看到的是臨時文件名而不是表名)
sparksql,同樣的,查看sparkui中stage對應的執(zhí)行計劃(stage二級頁面),spark2.4.3中stage的執(zhí)行計劃不是很詳細,可以結(jié)合數(shù)據(jù)量,算子然后在sparkui全局執(zhí)行計劃里去找,定位sql語句段;
如果是作業(yè)跑批失敗,在執(zhí)行計劃中體現(xiàn)為上游依賴stage全跑完了,而這個stage沒有打印執(zhí)行數(shù)據(jù)(sparksql執(zhí)行計劃每個stage跑完都會顯示跑批時間及輸出數(shù)據(jù))
大概的一些優(yōu)化方向
-
優(yōu)化sql
每一個作業(yè)就好比一個成型的積木,而成型的積木可由不同積木塊不同方式拼湊而成,條條大路通羅馬;使用怎樣的積木塊(表)以及怎樣的拼湊方式是我們一個優(yōu)化介入點;比如join順序,使用不同的表,先聚合在join等(視具體場景,具體處理) -
增加stage的task個數(shù)
這類體現(xiàn)為某個stage跑批時間很長但是又沒有傾斜,可以適當增加stage的reduce個數(shù);這類現(xiàn)象一般出現(xiàn)在多維聚合、countdistinct數(shù)據(jù)膨脹,多個表關(guān)聯(lián)包括開窗用的是一個mr(表關(guān)聯(lián)join字段和開窗partitionby字段是同一個字段,計算會優(yōu)化收攏到一個mr里,具體看執(zhí)行計劃),join數(shù)據(jù)發(fā)散多對多;
題外:數(shù)據(jù)量大,一定要reduce個數(shù)設置大些嗎?
不一定,如果存在map預聚合,且預聚合能顯著減少數(shù)據(jù)量的,有時候恰恰要手動把reduce配置少點;在hive里可以指定reduce個數(shù)mapred.reduce.tasks
,亦可結(jié)合每reduce處理數(shù)據(jù)量hive.exec.reducers.bytes.per.reducer
+hive.exec.reducers.max
動態(tài)配置reduce個數(shù)(但這個數(shù)據(jù)量是map邏輯前的數(shù)據(jù)量,如果有filter篩選,需適當增加.bytes.per.reducer參數(shù),減少reduce個數(shù));sparksql則直接設置spark.sql.shuffle.partitions
個數(shù)和動態(tài)分區(qū)參數(shù)spark.sql.adaptive.enabled
即可;
有的跑批時間長可能是自定義函數(shù)優(yōu)化做的不到位,這種情況優(yōu)先使用內(nèi)置函數(shù)或者找相關(guān)開發(fā)優(yōu)化自定義函數(shù);字符串處理能不正則的盡量不要用正則 -
解決傾斜
如果遇到傾斜,則按常規(guī)傾斜處理即可;在具體實踐中,我們發(fā)現(xiàn)hive傾斜參數(shù)機制,優(yōu)化效果不一定比手動處理要好;比如groupby傾斜參數(shù)set hive.groupby.skewindata=true
大多時候并不比手工處理要好,這里可以手動打散試試(就是麻煩點,如果邏輯復雜,改起來會很耗時);hive的join的傾斜參數(shù)不適用于外連接,這里需要注意下;此外mapjoin不適用于外連接主表是小表的情況(比如小表 left join 大表) -
開啟并行
這里主要是針對于hive計算,如果存在stage沒有依賴(查看執(zhí)行計劃),可以開啟并行執(zhí)行參數(shù)set hive.exec.parallel=true
,無依賴的stage可以并行執(zhí)行,比如union all上下邏輯子句,join中嵌套的子句等;
sparksql可以通過堆資源來減少跑批時間,但一般不建議設置的總cpu核數(shù)大于并發(fā)task數(shù)量的1/2;官方推薦是task的數(shù)據(jù)大于cpu總核數(shù)的三倍。比如有300個task,那cpu總核數(shù)盡量設置在150以下。
在數(shù)據(jù)體量很大時,我們發(fā)現(xiàn)sparksql的map stage耗時比較長,這里可以根據(jù)實際情況切換合適的計算引擎;
此外,基于整個作業(yè)鏈路去優(yōu)化目標作業(yè)跑批時效,要基于關(guān)鍵鏈路去優(yōu)化,非關(guān)鍵鏈路作業(yè)優(yōu)化對結(jié)果作業(yè)產(chǎn)出時間沒有影響;
countdistinct,每一個distinct數(shù)據(jù)都會膨脹一倍;如果countdistinct數(shù)量過多,但去重的都是同一個字段,只是條件不同時,改寫下sql,shuffle數(shù)據(jù)量可以顯著減少;
比如(假設flag是一個標簽字段,字段值只有0和1):
selecet group_name,count(distinct case when flag1=1 then id end) as dis_cnt1,count(distinct case when flag2=1 then id end) as dis_cnt2-- ,...,count(distinct case when flagn=1 then id end) as dis_cntn
from tablename
group by group_name
這里的數(shù)據(jù)有n個countdistinct,數(shù)據(jù)膨脹n倍;但讓若我們這么改寫:
selecet group_name,count(case when flag1=1 then id end) as dis_cnt1,count(case when flag2=1 then id end) as dis_cnt2-- ,...,count(case when flagn=1 then id end) as dis_cntn
(select group_name,id,max(flag1) as flag1,max(flag2) as flag2--,..,max(flagn) as flagnfrom tablenamegroup by group_name,id
) t
group by group_name
如此,數(shù)據(jù)沒有膨脹,只是多了一個mr成本而已;需注意的是,hive在開啟優(yōu)化后,該改寫,可能會合并到一個mr里,具體看執(zhí)行計劃&需求改寫;