Hi Nirav, > in our case streaming job was stuck for over 3 days
3 days seems too much, what exactly were the read limits and how many files before and after compaction ? Can you describe a bit more on your ingestion rate ? > Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the beginning after line `while (shouldContinueReading) {` in `latestOffSet Seems like a valid case, do you wanna open a pr for the same ? Happy to take a deeper look and help here. Thanks, Prashant Singh On Wed, Apr 10, 2024 at 2:48 PM Nirav Patel <nira...@gmail.com> wrote: > Hi Prashant, > > Thanks for responding and sharing the related issue. Issue that's been fix > seem very much related. However, in our case streaming job was stuck for > over 3 days. You are sayiing because of it scanning all the Manifests list > `latestOffSet` may not be returning for that long! > > Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the > beginning after line `while (shouldContinueReading) {` in `latestOffSet` > > > https://github.com/apache/iceberg/blob/b2a4fda8a80571a6a63f301ff67c63ecc0380cf7/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L350C3-L350C36 > > That way first snapshot ( startingOffset) is a `replace` snapshot it will > get skipped. DUe to compaction job rewriting over 1m files listing its > manifests could be too high unnecessarily. > > > > > > On Wed, Apr 10, 2024 at 4:31 PM Prashant Singh <prashant010...@gmail.com> > wrote: > >> Hi Nirav, >> >> Thanks for reporting the issue, let me try answering your question below >> :) >> >> > We are encountering the following issue where spark streaming read job >> from iceberg table stays stuck after some maintenance jobs >> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same >> table. >> https://github.com/apache/iceberg/issues/10117 >> >> can you please try this the fix in iceberg 1.5 related to skipping of >> rewrite files (please ref : https://github.com/apache/iceberg/pull/8980) >> >> > My theory is `SparkMicroBatchStream.latestOffset()` doesn't seem >> returning correct latest offset for table from latest metadata json or >> `SparkMicroBatchStream.planFiles()` is not returning any Set when it >> encounters `replace` snapshot. >> >> You are mostly close, yes those two are the ones you should check for, >> the issue which the pr above fixes is the skipping of replace files 1 by 1 >> (can be diff based on the configured rate limits) and this giving an >> impression of stream being struck in the background it's being read, please >> ref to the explanation here : >> >> "Suppose you trigger every minute and that you encounter a rewrite >> snapshot with 300 files. This means it will take 300 x 1 minute (300 >> minutes) to finally skip over the snapshot and start progressing again. >> >> I think that once a rewrite snapshot is detected we should exhaust all >> the positions (all the files) in that commit to position ourselves for >> the next commit." >> >> for more details please ref : >> https://github.com/apache/iceberg/issues/8902 >> >> Regards, >> >> Prashant Singh >> >> On Wed, Apr 10, 2024 at 8:46 AM Nirav Patel <nira...@gmail.com> wrote: >> >>> >>> >>> We are encountering the following issue where spark streaming read job >>> from iceberg table stays stuck after some maintenance jobs >>> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same >>> table. >>> https://github.com/apache/iceberg/issues/10117 >>> >>> >>> I'm trying to understand what creates the end `StreamingOffset` from the >>> latest metadata.json file. >>> My theory is `SparkMicroBatchStream.latestOffset()` doesn't seem >>> returning correct latest offset for table from latest metadata json or >>> `SparkMicroBatchStream.planFiles()` is not returning any Set when it >>> encounters `replace` snapshot. >>> >>> Are these 2 methods right places that determines what iceberg data >>> files/partitions MicroBatchExcecution will process upon trigger? >>> >>> >>>