使用 panda、spark、salting、autobroadcast 處理 50GB+ 大數據的執行結果比較。

驗證 10G+ 資料集 JOIN 小表的執行結果

執行環境

  • 在 VM 上建置 K8S cluster 環境,有 4 核 16G 兩台,2 核 8G 一台

  • 於 K8S cluster 部署 Kubeflow Spark Operator

  • 於 K8S cluster 部署 minio

生成有 data-skew 的待測資料

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
...

TOTAL_ROWS = 250_000_000 # 10GB 約需要 2 億到 3 億筆資料 (視壓縮率而定)
SKEW_RATIO = 0.8 # 80% 的資料會集中在少數幾個 material 上

df_skewed = df.withColumn(
"material",
F.when(F.col("id") < (TOTAL_ROWS * SKEW_RATIO), "M_0001").otherwise(
F.concat(F.lit("M_"), F.format_string("%04d", (F.rand() * 10000).cast("int")))
),
)

df_final = (
df_skewed.withColumn("line", F.concat(F.lit("Line-"), (F.rand() * 10).cast("int")))
.withColumn("machine", F.concat(F.lit("MC-"), (F.rand() * 50).cast("int")))
.withColumn("module", F.concat(F.lit("MOD-"), (F.rand() * 5).cast("int")))
.withColumn("slot", (F.rand() * 20).cast("int").cast("string"))
.withColumn("product", F.concat(F.lit("PROD-"), (F.rand() * 100).cast("int")))
.withColumn(
"status",
F.expr(
"case when rand() > 0.99 then 'ERROR' when rand() > 0.01 then 'OK' else 'UNKNOWN' end"
),
)
.withColumn("count", (F.rand() * 100).cast("int"))
)

驗證結果

方法 1:pandas

與 spark 設定同樣的 memory 限制條件,出現 OOMKilled
pandas-oomkilled.png

方法 2:spark

spark 預設的 autobroadcast 表現最好
spark-autobroadcast-01.png

方法 3:spark 關掉 autobroadcast

memory 不足,無法完成任務
spark-turnoff-autobroadcast-01.png
spark-turnoff-autobroadcast-02.png

方法 4:手動 salting

加入 salting 可以讓數據在有限 memory 中「排隊」分批處理,完成任務
salting-01.png
任務被平均分配到 executor 執行,作業時間平均,但是出現 Memory & Disk Spill
salting-02.png
如果拆成更多 partition,可以降低 Memory & Disk Spill,比如切成 400
salting-03.png
增加 memory 到 5G,沒有 Memory & Disk Spill
salting-04.png
但實際作業時間變長
salting-05.png

程式碼 repo:yuchun33/data-skew-exp

DONE.