Hi, > Can you describe a bit more on your ingestion rate ? > what exactly were the read limits? Streaming job ingestion is maximum 1M records per batch. Trigger interval is every 1 minute which seem to be fine for regular stream processing. Our avg per minute record count is way less than that.
> How many files before and after compaction ? Compaction went over the last 6+ months of data. Before compaction we had over 1.4 M files (small files) in active snapshot. After compaction it was only 17k. Compaction rewrote all the files and records. Total of 19 Billion records were rewritten into 17k files. so per math it would take 19B/1M minute = ~13 days to skip over all those replace records? Spark offset file has following for compaction offset: (this is the last offset that ever gets generated on spark offset dir) {"version":1,"snapshot_id":6784284089888508514,"position":0,"scan_all_files":false} Here's details of this snapshot_id from one of the metadata.json { "sequence-number" : 30742, "snapshot-id" : 6784284089888508514, "parent-snapshot-id" : 5804171981043532874, "timestamp-ms" : 1712399784222, "summary" : { "operation" : "replace", "added-data-files" : "9854", "deleted-data-files" : "1395902", "added-records" : "19050637961", "deleted-records" : "19050637961", "added-files-size" : "771245970960", "removed-files-size" : "995193063843", "changed-partition-count" : "4258", "total-records" : "19313094770", "total-files-size" : "784783197866", "total-data-files" : "17523", "total-delete-files" : "0", "total-position-deletes" : "0", "total-equality-deletes" : "0" }, "manifest-list" : ".gs://..gc5a2ac.avro", "schema-id" : 0 } So No new offset was being generated after streaming read job encounter offset with snapshot id of compaction job. Over 3 days all that driver node reported is following. No Streaming Query stats that gets generated after every Trigger execution. MicroBatchExecution:60 - Streaming query has been idle and waiting for new data more than 10000 ms. MicroBatchExecution:60 - Streaming query has been idle and waiting for new data more than 10000 ms. BaseMetastoreTableOperations:199 - Refreshing table metadata from new version: gs://...my_table_9a21adbf.metadata.json MicroBatchExecution:60 - Streaming query has been idle and waiting for new data more than 10000 ms. MicroBatchExecution:60 - Streaming query has been idle and waiting for new data more than 10000 ms. MicroBatchExecution:60 - Streaming query has been idle and waiting for new data more than 10000 ms. MicroBatchExecution:60 - Streaming query has been idle and waiting for new data more than 10000 ms. ... .. BaseMetastoreTableOperations:199 - Refreshing table metadata from new version: gs://...my_table_23423412a2323bf.metadata.json MicroBatchExecution:60 - Streaming query has been idle and waiting for new data more than 10000 ms. MicroBatchExecution:60 - Streaming query has been idle and waiting for new data more than 10000 ms. ... . Line `Refreshing table metadata..` picks up new metadata.json which is correct but nothing happens after that. no offset creation/commit, query planning, job processing, commit datafiles and commit log, query stat. etc none of it. Usually after that line i see "MicroBatchExecution:64 - Starting Trigger Calculation" line but I don't see it in this particular case. I tried to scan `MicroBatchExecution` code to understand code path but I can't find where it prints `Starting Trigger Calculation`. Do you know where that logging happens? I don't behavior with respect to `LatestOffset`. Is it that in each trigger MicroBatch is still processing previously received 19B records? Line `Refreshing table metadata..` in logs indicate that job is triggering and is getting new metadata file but nothing is happening. >From the code I also don't understand logic behind getting curPos. It seems like `LatestOffset` may always be concluding curPos = 0 and returning `null` as latestoffset. That could explain why no new offset file is being generated and the job always starts over but I might be completely wrong on that as Line `Refreshing table metadata..` indicates something precursor to trigger is happening. anyways, hope above helps. Thanks Nirav On Sun, Apr 14, 2024 at 11:56 AM Prashant Singh <prashant010...@gmail.com> wrote: > Hi Nirav, > > > in our case streaming job was stuck for over 3 days > > 3 days seems too much, what exactly were the read limits and how many > files before and after compaction ? Can you describe a bit more on your > ingestion rate ? > > > Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the > beginning after line `while (shouldContinueReading) {` in `latestOffSet > > Seems like a valid case, do you wanna open a pr for the same ? Happy to > take a deeper look and help here. > > Thanks, > Prashant Singh > > > > On Wed, Apr 10, 2024 at 2:48 PM Nirav Patel <nira...@gmail.com> wrote: > >> Hi Prashant, >> >> Thanks for responding and sharing the related issue. Issue that's been >> fix seem very much related. However, in our case streaming job was stuck >> for over 3 days. You are sayiing because of it scanning all the Manifests >> list `latestOffSet` may not be returning for that long! >> >> Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the >> beginning after line `while (shouldContinueReading) {` in `latestOffSet` >> >> >> https://github.com/apache/iceberg/blob/b2a4fda8a80571a6a63f301ff67c63ecc0380cf7/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L350C3-L350C36 >> >> That way first snapshot ( startingOffset) is a `replace` snapshot it will >> get skipped. DUe to compaction job rewriting over 1m files listing its >> manifests could be too high unnecessarily. >> >> >> >> >> >> On Wed, Apr 10, 2024 at 4:31 PM Prashant Singh <prashant010...@gmail.com> >> wrote: >> >>> Hi Nirav, >>> >>> Thanks for reporting the issue, let me try answering your question below >>> :) >>> >>> > We are encountering the following issue where spark streaming read job >>> from iceberg table stays stuck after some maintenance jobs >>> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same >>> table. >>> https://github.com/apache/iceberg/issues/10117 >>> >>> can you please try this the fix in iceberg 1.5 related to skipping of >>> rewrite files (please ref : https://github.com/apache/iceberg/pull/8980) >>> >>> >>> > My theory is `SparkMicroBatchStream.latestOffset()` doesn't seem >>> returning correct latest offset for table from latest metadata json or >>> `SparkMicroBatchStream.planFiles()` is not returning any Set when it >>> encounters `replace` snapshot. >>> >>> You are mostly close, yes those two are the ones you should check for, >>> the issue which the pr above fixes is the skipping of replace files 1 by 1 >>> (can be diff based on the configured rate limits) and this giving an >>> impression of stream being struck in the background it's being read, please >>> ref to the explanation here : >>> >>> "Suppose you trigger every minute and that you encounter a rewrite >>> snapshot with 300 files. This means it will take 300 x 1 minute (300 >>> minutes) to finally skip over the snapshot and start progressing again. >>> >>> I think that once a rewrite snapshot is detected we should exhaust all >>> the positions (all the files) in that commit to position ourselves for >>> the next commit." >>> >>> for more details please ref : >>> https://github.com/apache/iceberg/issues/8902 >>> >>> Regards, >>> >>> Prashant Singh >>> >>> On Wed, Apr 10, 2024 at 8:46 AM Nirav Patel <nira...@gmail.com> wrote: >>> >>>> >>>> >>>> We are encountering the following issue where spark streaming read job >>>> from iceberg table stays stuck after some maintenance jobs >>>> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same >>>> table. >>>> https://github.com/apache/iceberg/issues/10117 >>>> >>>> >>>> I'm trying to understand what creates the end `StreamingOffset` from >>>> the latest metadata.json file. >>>> My theory is `SparkMicroBatchStream.latestOffset()` doesn't seem >>>> returning correct latest offset for table from latest metadata json or >>>> `SparkMicroBatchStream.planFiles()` is not returning any Set when it >>>> encounters `replace` snapshot. >>>> >>>> Are these 2 methods right places that determines what iceberg data >>>> files/partitions MicroBatchExcecution will process upon trigger? >>>> >>>> >>>>