gengliangwang commented on code in PR #52516:
URL: https://github.com/apache/spark/pull/52516#discussion_r2422229742
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala:
##########
@@ -89,15 +89,26 @@ class ContinuousExecution(
logInfo(log"Reading table [${MDC(STREAMING_TABLE, table)}] " +
log"from DataSourceV2 named '${MDC(STREAMING_DATA_SOURCE_NAME,
sourceName)}' " +
log"${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
- // TODO: operator pushdown.
- val scan = table.newScanBuilder(options).build()
- val stream = scan.toContinuousStream(metadataPath)
val relation = StreamingDataSourceV2Relation(
table, output, catalog, identifier, options, metadataPath)
- StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
+ relation
})
}
+ // Run V2ScanRelationPushDown here (during analysis) instead of relying on
the optimizer.
+ // Continuous processing needs an actual V2 Scan early so we can
materialize the
+ // ContinuousStream via scan.toContinuousStream, enumerate sources, and
wire up checkpoint
+ // metadata paths before planning/execution. If we waited for the
optimizer, a Scan might not
+ // yet exist at this point, which would prevent creating the stream and
collecting sources
+ // reliably for offset tracking and recovery.
+ val _logicalPlan =
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
+ .apply(basePlan)
Review Comment:
Because Spark’s micro-batch streaming must materialize the MicroBatchStream
during the analysis phase, and DSv2 constructs it through the sequence
ScanBuilder → Scan → Scan.toMicroBatchStream, the optimizer rule
V2ScanRelationPushDown needs to be applied early—specifically on the analyzed
plan of MicroBatchExecution.
This makes the flow somewhat tricky. Moreover, since V2ScanRelationPushDown
**expects all predicates to be fully combined and pushed down**, applying it
too early may cause the pushdown to fail. We also need to handle streaming
deduplication properly.
Until we find a cleaner solution, I’ll close this PR for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]