Hi Prashant, Thanks for responding and sharing the related issue. Issue that's been fix seem very much related. However, in our case streaming job was stuck for over 3 days. You are sayiing because of it scanning all the Manifests list `latestOffSet` may not be returning for that long!
Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the beginning after line `while (shouldContinueReading) {` in `latestOffSet` https://github.com/apache/iceberg/blob/b2a4fda8a80571a6a63f301ff67c63ecc0380cf7/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L350C3-L350C36 That way first snapshot ( startingOffset) is a `replace` snapshot it will get skipped. DUe to compaction job rewriting over 1m files listing its manifests could be too high unnecessarily. On Wed, Apr 10, 2024 at 4:31 PM Prashant Singh <prashant010...@gmail.com> wrote: > Hi Nirav, > > Thanks for reporting the issue, let me try answering your question below > :) > > > We are encountering the following issue where spark streaming read job > from iceberg table stays stuck after some maintenance jobs > (rewrite_data_files and rewrite_manifests) has been ran on parallel on same > table. > https://github.com/apache/iceberg/issues/10117 > > can you please try this the fix in iceberg 1.5 related to skipping of > rewrite files (please ref : https://github.com/apache/iceberg/pull/8980) > > > My theory is `SparkMicroBatchStream.latestOffset()` doesn't seem > returning correct latest offset for table from latest metadata json or > `SparkMicroBatchStream.planFiles()` is not returning any Set when it > encounters `replace` snapshot. > > You are mostly close, yes those two are the ones you should check for, the > issue which the pr above fixes is the skipping of replace files 1 by 1 (can > be diff based on the configured rate limits) and this giving an impression > of stream being struck in the background it's being read, please ref to the > explanation here : > > "Suppose you trigger every minute and that you encounter a rewrite > snapshot with 300 files. This means it will take 300 x 1 minute (300 > minutes) to finally skip over the snapshot and start progressing again. > > I think that once a rewrite snapshot is detected we should exhaust all the > positions (all the files) in that commit to position ourselves for the > next commit." > > for more details please ref : > https://github.com/apache/iceberg/issues/8902 > > Regards, > > Prashant Singh > > On Wed, Apr 10, 2024 at 8:46 AM Nirav Patel <nira...@gmail.com> wrote: > >> >> >> We are encountering the following issue where spark streaming read job >> from iceberg table stays stuck after some maintenance jobs >> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same >> table. >> https://github.com/apache/iceberg/issues/10117 >> >> >> I'm trying to understand what creates the end `StreamingOffset` from the >> latest metadata.json file. >> My theory is `SparkMicroBatchStream.latestOffset()` doesn't seem >> returning correct latest offset for table from latest metadata json or >> `SparkMicroBatchStream.planFiles()` is not returning any Set when it >> encounters `replace` snapshot. >> >> Are these 2 methods right places that determines what iceberg data >> files/partitions MicroBatchExcecution will process upon trigger? >> >> >>