[
https://issues.apache.org/jira/browse/SPARK-51331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931334#comment-17931334
]
Jungtaek Lim edited comment on SPARK-51331 at 2/28/25 12:50 AM:
----------------------------------------------------------------
OK, sorry for misunderstanding. You know exactly how it works and reason about
fault tolerance.
The actual reason is, for DSv1 source, we have a trick on restarting the query
- {_}*we call getBatch for previous batch*{_}. So the offset for the last
committed batch should be still available despite it was expected to not be
processed again.
I understand this is legacy and I even don't know why this trick exists (it was
there before I jumped to Spark). Maybe we could find a time to handle this
better. From that time, can we reopen this but mark this as an "improvement"
rather than a "bug"?
was (Author: kabhwan):
OK, sorry for misunderstanding. You know exactly how it works and reason about
fault tolerance.
The actual reason is, for DSv1 source, we have a trick on restarting the query
- we call getBatch for previous batch. So the offset for the last committed
batch should be still available despite it was expected to not be processed
again.
I understand this is legacy and I even don't know why this trick exists (it was
there before I jumped to Spark). Maybe we could find a time to handle this
better. From that time, can we reopen this but mark this as an "improvement"
rather than a "bug"?
> Structured streaming batch with fixed interval trigger is committed only when
> next batch is about to start
> ----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-51331
> URL: https://issues.apache.org/jira/browse/SPARK-51331
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.5.3
> Environment: Spark 3.5.3 + Pyspark
> Reporter: Alex
> Priority: Major
>
>
> h1. Description
> When structured streaming is configured using
>
> {code:java}
> trigger(processingTime='10 minutes') {code}
> and micro-batch itself took 2 minutes, then its progress is not committed
> until next batch is about to start, i.e. it is delayed by 8 minutes.
>
> h1. Logs
> Below batch was finished at 1:02 but not committed until 1:10, i.e. when it
> was time for next batch
>
> {code:java}
> 25/02/27 01:00:00 INFO MicroBatchExecution: Committed offsets for batch 0.
> ....
> 25/02/27 01:02:12 INFO MicroBatchExecution: Streaming query made progress
> 25/02/27 01:10:00 INFO MicroBatchExecution: Committed offsets for batch 1
>
> {code}
> h1. Code references
> When next batch is being constructed in `constructNextBatch` function
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L600]
> it calls `cleanUpLastExecutedMicroBatch`
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L674]
> which commits offsets
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L996-L997]
> h1. Why it is problem?
> For long triggers like 10 minutes or even 1 hour, there is chance that job
> will be cancelled (e.g. for redeploy), so offsets will be not committed *even
> if data have been written to sink*
> h1.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]