JiaqiWang18 commented on code in PR #52121:
URL: https://github.com/apache/spark/pull/52121#discussion_r2308844223


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -112,23 +112,33 @@ object FlowAnalysis {
         // - SELECT ... FROM STREAM(t1)
         // - SELECT ... FROM STREAM t1
         case u: UnresolvedRelation if u.isStreaming =>
-          readStreamInput(
+          val resolved = readStreamInput(
             context,
             name = IdentifierHelper.toQuotedString(u.multipartIdentifier),
             spark.readStream,
             streamingReadOptions = StreamingReadOptions()
           ).queryExecution.analyzed
-
+          // Spark Connect requires the PLAN_ID_TAG to be propagated to the 
resolved plan
+          // to allow correct analysis of the parent plan that contains this 
subquery
+          u.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach(
+            id => resolved.setTagValue(LogicalPlan.PLAN_ID_TAG, id)
+          )
+          resolved
         // Batch read on another dataset in the pipeline
         case u: UnresolvedRelation =>
-          readBatchInput(
+          val resolved = readBatchInput(
             context,
             name = IdentifierHelper.toQuotedString(u.multipartIdentifier),
             batchReadOptions = BatchReadOptions()
           ).queryExecution.analyzed
+          // Spark Connect requires the PLAN_ID_TAG to be propagated to the 
resolved plan
+          // to allow correct analysis of the parent plan that contains this 
subquery
+          u.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach(
+            id => resolved.setTagValue(LogicalPlan.PLAN_ID_TAG, id)
+          )

Review Comment:
   Also, we are directly using `setTagValue(LogicalPlan.PLAN_ID_TAG ..)` 
instead of using `copyTagsFrom` because the `reslved` plan contains a 
`dataset_id` tag and `copyTagsFrom` expects the destination to not contain any 
tags 
([src](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L179)).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to