JiaqiWang18 commented on code in PR #52121: URL: https://github.com/apache/spark/pull/52121#discussion_r2309149657
########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala: ########## @@ -112,23 +112,33 @@ object FlowAnalysis { // - SELECT ... FROM STREAM(t1) // - SELECT ... FROM STREAM t1 case u: UnresolvedRelation if u.isStreaming => - readStreamInput( + val resolved = readStreamInput( context, name = IdentifierHelper.toQuotedString(u.multipartIdentifier), spark.readStream, streamingReadOptions = StreamingReadOptions() ).queryExecution.analyzed - + // Spark Connect requires the PLAN_ID_TAG to be propagated to the resolved plan + // to allow correct analysis of the parent plan that contains this subquery + u.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach( + id => resolved.setTagValue(LogicalPlan.PLAN_ID_TAG, id) Review Comment: @cloud-fan We were directly using `setTagValue(LogicalPlan.PLAN_ID_TAG ..)` instead of using `copyTagsFrom` because the `reslved` plan contains a `dataset_id` tag and `copyTagsFrom` expects the destination to not contain any tags ([src](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L179)). I think it make sense to also copy over other tags, so I introduced a new `mergeTagsFrom` [method](https://github.com/apache/spark/pull/52121/commits/0b3b26e663dcde3a029262920d83196f68d7fc71). I don't expect there will be tags that are defined in both and have conflicting values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org