Shuffle核心概念、Shuffle調優及故障排除
Spark調優之Shuffle調優
本節開始先講解Shuffle核心概念;然后針對HashShuffle、SortShuffle進行調優;接下來對map端、reduce端調優;再針對Spark中的數據傾斜問題進行剖析及調優;最后是Spark運行過程中的故障排除。
一、Shuffle的核心概念
1. ShuffleMapStage與ResultStage

ShuffleMapStage與ResultStage
在劃分stage時,最后一個stage稱為FinalStage,它本質上是一個ResultStage對象,前面的所有stage被稱為ShuffleMapStage。
ShuffleMapStage的結束伴隨著shuffle文件的寫磁盤。
ResultStage基本上對應代碼中的action算子,即將一個函數應用在RDD的各個partition的數據集上,意味著一個job的運行結束。
2. Shuffle中的任務個數
我們知道,Spark Shuffle分為map階段和reduce階段,或者稱之為ShuffleRead階段和ShuffleWrite階段,那么對于一次Shuffle,map過程和reduce過程都會由若干個task來執行,那么map task和reduce task的數量是如何確定的呢?
假設Spark任務從HDFS中讀取數據,那么初始RDD分區個數由該文件的split個數決定,也就是一個split對應生成的RDD的一個partition,我們假設初始partition個數為N。
初始RDD經過一系列算子計算后(假設沒有執行repartition和coalesce算子進行重分區,則分區個數不變,仍為N,如果經過重分區算子,那么分區個數變為M),我們假設分區個數不變,當執行到Shuffle操作時,map端的task個數和partition個數一致,即map task為N個。
reduce端的stage默認取spark.default.parallelism這個配置項的值作為分區數,如果沒有配置,則以map端的最后一個RDD的分區數作為其分區數(也就是N),那么分區數就決定了reduce端的task的個數。
3. reduce端數據的讀取
根據stage的劃分我們知道,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task會先執行,那么后執行的reduce task如何知道從哪里去拉取map task落盤后的數據呢?
reduce端的數據拉取過程如下:
map task 執行完畢后會將計算狀態以及磁盤小文件位置等信息封裝到MapStatus對象中,然后由本進程中的MapOutPutTrackerWorker對象將mapStatus對象發送給Driver進程的MapOutPutTrackerMaster對象;在reduce task開始執行之前會先讓本進程中的MapOutputTrackerWorker向Driver進程中的MapoutPutTrakcerMaster發動請求,請求磁盤小文件位置信息;當所有的Map task執行完畢后,Driver進程中的MapOutPutTrackerMaster就掌握了所有的磁盤小文件的位置信息。此時MapOutPutTrackerMaster會告訴MapOutPutTrackerWorker磁盤小文件的位置信息;完成之前的操作之后,由BlockTransforService去Executor0所在的節點拉數據,默認會啟動五個子線程。每次拉取的數據量不能超過48M(reduce task每次最多拉取48M數據,將拉來的數據存儲到Executor內存的20%內存中)。
二、HashShuffle解析
以下的討論都假設每個Executor有1個cpu core。
1. 未經優化的HashShuffleManager
shuffle write階段,主要就是在一個stage結束計算之后,為了下一個stage可以執行shuffle類的算子(比如reduceByKey),而將每個task處理的數據按key進行“劃分”。所謂“劃分”,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于下游stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩沖中,當內存緩沖填滿之后,才會溢寫到磁盤文件中去。
下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。比如下一個stage總共有100個task,那么當前stage的每個task都要創建100份磁盤文件。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個task,那么每個Executor上總共就要創建500個磁盤文件,所有Executor上會創建5000個磁盤文件。由此可見,未經優化的shuffle write操作所產生的磁盤文件的數量是極其驚人的。
shuffle read階段,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然后進行key的聚合或連接等操作。由于shuffle write的過程中,map task給下游stage的每個reduce task都創建了一個磁盤文件,因此shuffle read的過程中,每個reduce task只要從上游stage的所有map task所在節點上,拉取屬于自己的那一個磁盤文件即可。
shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數據,然后通過內存中的一個Map進行聚合等操作。聚合完一批數據后,再拉取下一批數據,并放到buffer緩沖中進行聚合操作。以此類推,直到最后將所有數據到拉取完,并得到最終的結果。
未優化的HashShuffleManager工作原理如下圖所示:

未優化的HashShuffleManager工作原理2. 優化后的HashShuffleManager
為了優化HashShuffleManager我們可以設置一個參數:spark.shuffle.consolidateFiles,該參數默認值為false,將其設置為true即可開啟優化機制,通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項。
開啟consolidate機制之后,在shuffle write過程中,task就不是為下游stage的每個task創建一個磁盤文件了,此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個cpu core,就可以并行執行多少個task。而第一批并行執行的每個task都會創建一個shuffleFileGroup,并將數據寫入對應的磁盤文件內。
當Executor的cpu core執行完一批task,接著執行下一批task時,下一批task就會復用之前已有的shuffleFileGroup,包括其中的磁盤文件,也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate機制允許不同的task復用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進行一定程度上的合并,從而大幅度減少磁盤文件的數量,進而提升shuffle write的性能。
假設第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor(Executor CPU個數為1),每個Executor執行5個task。那么原本使用未經優化的HashShuffleManager時,每個Executor會產生500個磁盤文件,所有Executor會產生5000個磁盤文件的。但是此時經過優化之后,每個Executor創建的磁盤文件的數量的計算公式為:cpu core的數量 * 下一個stage的task數量,也就是說,每個Executor此時只會創建100個磁盤文件,所有Executor只會創建1000個磁盤文件。
優化后的HashShuffleManager工作原理如下圖所示:

優化后的HashShuffleManager工作原理
請輸入評論內容...
請輸入評論/評論長度6~500個字
最新活動更多
- 1 AI狂歡遇上油價破百,全球股市還能漲多久? | 產聯看全球
- 2 OpenAI深夜王炸!ChatGPT Images 2.0實測:中文穩、細節炸,設計師慌了
- 3 6000億美元估值錨定:字節跳動的“去單一化”突圍與估值重構
- 4 Tesla AI5芯片最新進展總結
- 5 連夜測了一波DeepSeek-V4,我發現它可能只剩“審美”這個短板了
- 6 熱點丨AI“瑜亮之爭”:既生OpenClaw,何生Hermes?
- 7 AI界的殺豬盤:9秒刪庫跑路,全員被封號,還繼續扣錢!
- 8 2026,人形機器人只贏了面子
- 9 DeepSeek降價90%:價格屠夫不是身份,是戰略
- 10 AI Infra產業鏈卡在哪里了?


分享













