Dear all, I'm on a case that when certain table being exposed to broadcast join, the query will eventually failed with remote block error.
Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely 10485760 [image: image.png] Then we proceed to perform query. In the SQL plan, we found that one table that is 25MB in size is broadcast as well. [image: image.png] Also in desc extended the table is 24452111 bytes. It is a Hive table. We always ran into error when this table being broadcast. Below is the sample error Caused by: java.io.IOException: org.apache.spark.SparkException: corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625 != -992055931 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) Also attached the physical plan if you're interested. One thing to note that, if I turn down autoBroadcastJoinThreshold to 5MB, this query will get successfully executed and default.product NOT broadcasted. However, when I change to another query that querying even less columns than pervious one, even in 5MB this table still get broadcasted and failed with the same error. I even changed to 1MB and still the same. Appreciate if you can share any input. Thank you very much. Best Regards, MIke
== Physical Plan == *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, bu_name#273, principle_supplier_code#154 AS supplier_code#476, mother_company_name#150 AS supplier_name#477, brand_type_name#117, brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, h1_l1_hierarchy_name#126 AS Category_Name#480, coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 more fields] +- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 41 more fields] : +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], [fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 35 more fields] : : +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], LeftOuter, BuildRight : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 33 more fields] : : : +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], [cast(store_key#155 as double)], LeftOuter, BuildRight : : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 19 more fields] : : : : +- *(10) BroadcastHashJoin [cast(product_key#445 as double)], [cast(product_key#100 as double)], LeftOuter, BuildRight : : : : :- *(10) Project [coalesce(product_key#80, product_key#524) AS product_key#445, coalesce(store_key#81, store_key#525) AS store_key#446, coalesce(fiscal_year#83, cast(cast((cast(fiscal_year#527 as double) + 1.0) as int) as string)) AS fiscal_year#447, coalesce(fiscal_month#84, fiscal_month#528) AS fiscal_month#448, coalesce(fiscal_week_of_year#85, fiscal_week_of_year#529) AS fiscal_week_of_year#449, coalesce(fiscal_year_week#86, cast(cast((cast(fiscal_year_week#530 as double) + 100.0) as int) as string)) AS fiscal_year_week#450, coalesce(max_fiscal_date#87, max_fiscal_date#531) AS max_fiscal_date#451, coalesce(sales_amt_local#91, 0.0) AS sales_amt_local#452, coalesce(cogs_amt_local#92, 0.0) AS cogs_amt_local#453, coalesce(gross_profit_amt_local#93, 0.0) AS gross_profit_amt_local#454, (coalesce(gross_profit_amt_local#93, 0.0) + coalesce(compensation#95, 0.0)) AS gross_margin_amt_local#455, coalesce(adj_amt_local#94, 0.0) AS adj_amt_local#456, coalesce(compensation#95, 0.0) AS compensation#457, coalesce(qty#96, 0.0) AS qty#458, coalesce(sales_amt_local_shrink#97, 0.0) AS sales_amt_local_shrink#459, coalesce(cogs_amt_local_shrink#98, 0.0) AS cogs_amt_local_shrink#460, coalesce(qty_shrink#99, 0.0) AS qty_shrink#461, coalesce(sales_amt_local#535, 0.0) AS sales_amt_local_ly#462, coalesce(cogs_amt_local#536, 0.0) AS cogs_amt_local_ly#463, coalesce(gross_profit_amt_local#537, 0.0) AS gross_profit_amt_local_ly#464, (coalesce(gross_profit_amt_local#537, 0.0) + coalesce(compensation#539, 0.0)) AS gross_margin_amt_local_ly#465, coalesce(adj_amt_local#538, 0.0) AS adj_amt_local_ly#466, coalesce(compensation#539, 0.0) AS compensation_ly#467, coalesce(qty#540, 0.0) AS qty_ly#468, ... 3 more fields] : : : : : +- *(10) Filter ((vs_max_sales_year#88 >= -2) || vs_max_sales_year#532 IN (-1,-2,-3)) : : : : : +- SortMergeJoin [product_key#80, store_key#81, (cast(fiscal_year#83 as double) - 1.0), fiscal_week_of_year#85], [product_key#524, store_key#525, cast(fiscal_year#527 as double), fiscal_week_of_year#529], FullOuter : : : : : :- *(2) Sort [product_key#80 ASC NULLS FIRST, store_key#81 ASC NULLS FIRST, (cast(fiscal_year#83 as double) - 1.0) ASC NULLS FIRST, fiscal_week_of_year#85 ASC NULLS FIRST], false, 0 : : : : : : +- Exchange hashpartitioning(product_key#80, store_key#81, (cast(fiscal_year#83 as double) - 1.0), fiscal_week_of_year#85, 200) : : : : : : +- *(1) FileScan parquet default.temp_sales_aggregate_wk[product_key#80,store_key#81,fiscal_year#83,fiscal_month#84,fiscal_week_of_year#85,fiscal_year_week#86,max_fiscal_date#87,vs_max_sales_year#88,sales_amt_local#91,cogs_amt_local#92,gross_profit_amt_local#93,adj_amt_local#94,compensation#95,qty#96,sales_amt_local_shrink#97,cogs_amt_local_shrink#98,qty_shrink#99] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_key:string,store_key:string,fiscal_year:string,fiscal_month:string,fiscal_week_of_... : : : : : +- *(4) Sort [product_key#524 ASC NULLS FIRST, store_key#525 ASC NULLS FIRST, cast(fiscal_year#527 as double) ASC NULLS FIRST, fiscal_week_of_year#529 ASC NULLS FIRST], false, 0 : : : : : +- Exchange hashpartitioning(product_key#524, store_key#525, cast(fiscal_year#527 as double), fiscal_week_of_year#529, 200) : : : : : +- *(3) FileScan parquet default.temp_sales_aggregate_wk[product_key#524,store_key#525,fiscal_year#527,fiscal_month#528,fiscal_week_of_year#529,fiscal_year_week#530,max_fiscal_date#531,vs_max_sales_year#532,sales_amt_local#535,cogs_amt_local#536,gross_profit_amt_local#537,adj_amt_local#538,compensation#539,qty#540,sales_amt_local_shrink#541,cogs_amt_local_shrink#542,qty_shrink#543] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_key:string,store_key:string,fiscal_year:string,fiscal_month:string,fiscal_week_of_... : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(38,0), true] as double))) : : : : +- *(5) FileScan parquet default.product[product_key#100,product_id#102,product_other_name#106,product_name#107,product_other_description#108,brand_name#115,brand_type_name#117,h1_l1_hierarchy_code#125,h1_l1_hierarchy_name#126,h1_l2_hierarchy_code#127,h1_l2_hierarchy_name#128,h1_l3_hierarchy_code#129,h1_l3_hierarchy_name#130,h1_l4_hierarchy_code#131,h1_l4_hierarchy_name#132,Mother_Company_Name#150,Principle_Supplier_Code#154] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_key:decimal(38,0),product_id:string,product_other_name:string,product_name:string,... : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(38,0), true] as double))) : : : +- *(6) FileScan parquet default.store[store_key#155,bu_key#156,store_id#157,store_code#159,store_name#160,store_other_name#161,store_format_code#162,store_format_name#163,address_1#166,address_2#167,address_5#170,web_store_flag#184,comp_store_flag#188,h1_l2_hierarchy_name#193,h1_l5_hierarchy_name#199] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<store_key:decimal(38,0),bu_key:decimal(38,0),store_id:string,store_code:string,store_name:... : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) : : +- HiveTableScan [loc_idnt#521, latitude#522, longitude#523], HiveTableRelation `default`.`geo_tableau`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [loc_idnt#521, latitude#522, longitude#523] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true], input[0, string, true])) : +- *(8) HashAggregate(keys=[fiscal_year#234, fiscal_week_of_year#222, fiscal_week_start_date#220, fiscal_week_end_date#221, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266], functions=[]) : +- Exchange hashpartitioning(fiscal_year#234, fiscal_week_of_year#222, fiscal_week_start_date#220, fiscal_week_end_date#221, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266, 200) : +- *(7) HashAggregate(keys=[fiscal_year#234, fiscal_week_of_year#222, fiscal_week_start_date#220, fiscal_week_end_date#221, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266], functions=[]) : +- *(7) FileScan parquet default.bl_business_date[FISCAL_WEEK_START_DATE#220,FISCAL_WEEK_END_DATE#221,FISCAL_WEEK_OF_YEAR#222,FISCAL_YEAR#234,vs_max_sales_week#266,vs_max_sales_month#267,vs_max_sales_year#270] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<FISCAL_WEEK_START_DATE:string,FISCAL_WEEK_END_DATE:string,FISCAL_WEEK_OF_YEAR:string,FISCA... +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,0), true])) +- *(9) FileScan parquet rkrdmx.d_rk_business_unit[BU_KEY#271,BU_ID#272,BU_NAME#273] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasbs://hiv...@aswprodeaskrkrdmx.blob.core.windows.net/hive/warehouse/rkrdmx.db..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<BU_KEY:decimal(38,0),BU_ID:string,BU_NAME:string>
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org