hi all




 i have a query 

 ```




1  spark.sql("select

    distinct cust_id,

    cast (b.device_name as varchar(200)) as devc_name_cast,

    prmry_reside_cntry_code

from (select * from ${model_db}.crs_recent_30d_SF_dim_cust_info where 
dt='${today}') a

join fact_rsk_magnes_txn b on a.cust_id = b.customer_id

where b.device_name <> '#'

and b.device_name is not null

and b.device_name <>'~'").registerTempTable("device_driver_info_0")




2 spark.sql("select 

    *,

    lower(regexp_REPLACE (devc_name_cast, 'â', 'a') ) as devc_name_norm

from device_driver_info_0").registerTempTable("device_driver_info_1")










 3 spark.sql("select 

    cust_id,

    devc_name_norm ||'_'|| prmry_reside_cntry_code as Device_Name_Country

from device_driver_info_1 where 
dt='${today}'").registerTempTable("device_driver_info")




4 spark.sql("select 

    cust_id,

    Device_Name_Country

from device_driver_info

where Device_Name_Country is not null

group by 1,2").registerTempTable("device_name_SF_final_acct_info")

 




5 spark.sql("select

    Device_Name_Country,

    count(distinct cust_id) as cust_cnt

from device_name_SF_final_acct_info 

group by 1").registerTempTable("device_count_1")




spark.sql("select * from device_count_1 where cust_cnt between 5 and 
5000").registerTempTable("device_count")




6 spark.sql("select

    b.cust_id,

    cast('Device_Name_Country' as varchar(100)) network_source,

    cast(a.Device_Name_Country as varchar(100)) as network_source_value

from device_count a

left join device_name_SF_final_acct_info b

on a.Device_Name_Country=b.Device_Name_Country").write

        .mode(SaveMode.Overwrite)

        .insertInto(s"$databaseName.$tableName")

 ```




the problem here is from the logical plan , we can see Device_Name_Country is 
composed

by 'devc_name_norm ||'_'|| prmry_reside_cntry_code' in sql#3  but it does not 
show in below logic plan so it throws error.  I find the sql run successfully 
on spark2 while on 

spark3.1.2 it has error, please help 













ShuffleQueryStage 6

+- Exchange hashpartitioning(cust_id#4030, Device_Name_Country#4099, 3001), 
ENSURE_REQUIREMENTS, [id=#2669]

+- *(5) HashAggregate(keys=[cust_id#4030, Device_Name_Country#4099], 
functions=[], output=[cust_id#4030, Device_Name_Country#4099])

+- CustomShuffleReader coalesced

+- ShuffleQueryStage 3

+- Exchange hashpartitioning(cust_id#4030, devc_name_cast#4029, 
prmry_reside_cntry_code#4036, 3001), ENSURE_REQUIREMENTS, [id=#2376]

+- *(3) HashAggregate(keys=[cust_id#4030, devc_name_cast#4029, 
prmry_reside_cntry_code#4036], functions=[], output=[cust_id#4030, 
devc_name_cast#4029, prmry_reside_cntry_code#4036])

+- *(3) Project [cust_id#4030, device_name#3453 AS devc_name_cast#4029, 
prmry_reside_cntry_code#4036]

+- *(3) BroadcastHashJoin [cust_id#4030], [customer_id#3431], Inner, BuildLeft, 
isnotnull(concat(concat(lower(regexp_replace(device_name#3453, â, a, 1)), _), 
prmry_reside_cntry_code#4036)), false

:- BroadcastQueryStage 0

: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
false]),false), [id=#2132]

: +- *(1) Filter isnotnull(cust_id#4030)

: +- Scan hive unified_group_review_cri_group.crs_recent_30d_sf_dim_cust_info 
[cust_id#4030, prmry_reside_cntry_code#4036], HiveTableRelation 
[`unified_group_review_cri_group`.`crs_recent_30d_sf_dim_cust_info`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [cust_id#4030, 
acct_cre_dt#4031, is_guest_y_n#4032, prmry_email_domain#4033, 
cust_first_name#4034..., Partition Cols: [dt#4048], Pruned Partitions: 
[(dt=2023-03-23)]], [isnotnull(dt#4048), (dt#4048 = 2023-03-23)]

+- *(3) Project [customer_id#3431, device_name#3453]

+- *(3) Filter (((NOT (device_name#3453 = #) AND isnotnull(device_name#3453)) 
AND NOT (device_name#3453 = ~)) AND isnotnull(customer_id#3431))

+- *(3) ColumnarToRow

+- FileScan parquet 
pp_risk_ops_qh_tables.magnes_fraudnet_login_raw[customer_id#3431,device_name#3453,ts#3603,event_dt#3604]
 Batched: true, DataFilters: [NOT (device_name#3453 = #), 
isnotnull(device_name#3453), NOT (device_name#3453 = ~), isnotnull(c..., 
Format: Parquet, Location: 
InMemoryFileIndex[gs://pypl-bkt-prd-row-std-gds-non-edw-tables/apps/risk/ads/rda/magnes_fraudnet_...,
 PartitionFilters: [isnotnull(event_dt#3604), (cast(event_dt#3604 as date) >= 
19409), (event_dt#3604 <= 2023-03-23)], PushedFilters: 
[Not(EqualTo(device_name,#)), IsNotNull(device_name), 
Not(EqualTo(device_name,~)), IsNotNull(cust..., ReadSchema: 
struct<customer_id:string,device_name:string>




ERROR 
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
execute, tree:
ShuffleQueryStage 6
+- Exchange hashpartitioning(cust_id#13555, Device_Name_Country#13624, 3001), 
ENSURE_REQUIREMENTS, [id=#23666]
+- *(5) HashAggregate(keys=[cust_id#13555, Device_Name_Country#13624], 
functions=[], output=[cust_id#13555, Device_Name_Country#13624])
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 3
+- Exchange hashpartitioning(cust_id#13555, devc_name_cast#13554, 
prmry_reside_cntry_code#13561, 3001), ENSURE_REQUIREMENTS, [id=#22991]
+- *(3) HashAggregate(keys=[cust_id#13555, devc_name_cast#13554, 
prmry_reside_cntry_code#13561], functions=[], output=[cust_id#13555, 
devc_name_cast#13554, prmry_reside_cntry_code#13561])
+- *(3) Project [cust_id#13555, device_name#11949 AS devc_name_cast#13554, 
prmry_reside_cntry_code#13561]
+- *(3) BroadcastHashJoin [cust_id#13555], [customer_id#11927], Inner, 
BuildLeft, isnotnull(concat(concat(lower(regexp_replace(device_name#11949, â, 
a, 1)), _), prmry_reside_cntry_code#13561)), false
:- BroadcastQueryStage 0
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
false]),false), [id=#22650]
: +- *(1) Filter isnotnull(cust_id#13555)
: +- Scan hive unified_group_review_cri_group.crs_recent_30d_sf_dim_cust_info 
[cust_id#13555, prmry_reside_cntry_code#13561], HiveTableRelation 
[`unified_group_review_cri_group`.`crs_recent_30d_sf_dim_cust_info`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [cust_id#13555, 
acct_cre_dt#13556, is_guest_y_n#13557, prmry_email_domain#13558, 
cust_first_name#..., Partition Cols: [dt#13573], Pruned Partitions: 
[(dt=2023-03-23)]], [isnotnull(dt#13573), (dt#13573 = 2023-03-23)]
+- *(3) Project [customer_id#11927, device_name#11949]
+- *(3) Filter (((NOT (device_name#11949 = #) AND isnotnull(device_name#11949)) 
AND NOT (device_name#11949 = ~)) AND isnotnull(customer_id#11927))
+- *(3) ColumnarToRow
+- FileScan parquet 
pp_risk_ops_qh_tables.magnes_fraudnet_login_raw[customer_id#11927,device_name#11949,ts#12099,event_dt#12100]
 Batched: true, DataFilters: [NOT (device_name#11949 = #), 
isnotnull(device_name#11949), NOT (device_name#11949 = ~), isnotnul..., Format: 
Parquet, Location: 
InMemoryFileIndex[gs://pypl-bkt-prd-row-std-gds-non-edw-tables/apps/risk/ads/rda/magnes_fraudnet_...,
 PartitionFilters: [isnotnull(event_dt#12100), (cast(event_dt#12100 as date) >= 
19409), (event_dt#12100 <= 2023-03-23)], PushedFilters: 
[Not(EqualTo(device_name,#)), IsNotNull(device_name), 
Not(EqualTo(device_name,~)), IsNotNull(cust..., ReadSchema: 
struct<customer_id:string,device_name:string>

    at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at 
org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:162)
    at 
org.apache.spark.sql.execution.adaptive.QueryStageExec.$anonfun$materialize$1(QueryStageExec.scala:80)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at 
org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:80)
    at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$4(AdaptiveSparkPlanExec.scala:196)
    at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$4$adapted(AdaptiveSparkPlanExec.scala:194)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:194)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:180)
    at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:296)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.SortExec.doExecute(SortExec.scala:112)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:184)
    ... 42 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
Binding attribute, tree: Device_Name_Country#13624
    at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
    at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
    at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:307)
    at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:74)
    at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:96)
    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReferences(BoundAttribute.scala:96)
    at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsumeWithKeys(HashAggregateExec.scala:828)
    at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doConsume(HashAggregateExec.scala:156)
    at 
org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:221)
    at 
org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:192)
    at 
org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
    at 
org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:496)
    at 
org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:483)
    at 
org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:456)
    at 
org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:496)
    at 
org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at 
org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
    at 
org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
    at 
org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:496)
    at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:733)
    at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
    at 
org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at 
org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
    at 
org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
    at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:655)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:718)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:122)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:121)
    at 
org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:162)
    at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 67 more
Caused by: java.lang.RuntimeException: Couldn't find Device_Name_Country#13624 
in [cust_id#13555,devc_name_cast#13554,prmry_reside_cntry_code#13561]
    at scala.sys.package$.error(package.scala:30)
    at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
    at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 121 more




Best Regards
Kelly Zhang

Reply via email to