Hi Prashant,

Thanks for responding and sharing the related issue. Issue that's been fix
seem very much related. However, in our case streaming job was stuck for
over 3 days. You are sayiing because of it scanning all the Manifests list
`latestOffSet` may  not be returning for that long!

Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the
beginning after line `while (shouldContinueReading) {`  in `latestOffSet`

https://github.com/apache/iceberg/blob/b2a4fda8a80571a6a63f301ff67c63ecc0380cf7/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L350C3-L350C36

That way first snapshot ( startingOffset) is a `replace` snapshot it will
get skipped. DUe to compaction job rewriting over 1m files listing its
manifests could be too high unnecessarily.





On Wed, Apr 10, 2024 at 4:31 PM Prashant Singh <prashant010...@gmail.com>
wrote:

> Hi Nirav,
>
> Thanks for reporting the issue, let me try answering your question below
> :)
>
> > We are encountering the following issue where spark streaming read job
> from iceberg table stays stuck after some maintenance jobs
> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same
> table.
> https://github.com/apache/iceberg/issues/10117
>
> can you please try this the fix in iceberg 1.5 related to skipping of
> rewrite files (please ref : https://github.com/apache/iceberg/pull/8980)
>
> > My theory is  `SparkMicroBatchStream.latestOffset()` doesn't seem
> returning correct latest offset for table from latest metadata json or
> `SparkMicroBatchStream.planFiles()` is not returning any Set when it
> encounters `replace` snapshot.
>
> You are mostly close, yes those two are the ones you should check for, the
> issue which the pr above fixes is the skipping of replace files 1 by 1 (can
> be diff based on the configured rate limits) and this giving an impression
> of stream being struck in the background it's being read, please ref to the
> explanation here :
>
> "Suppose you trigger every minute and that you encounter a rewrite
> snapshot with 300 files. This means it will take 300 x 1 minute (300
> minutes) to finally skip over the snapshot and start progressing again.
>
> I think that once a rewrite snapshot is detected we should exhaust all the
> positions (all the files) in that commit to position ourselves for the
> next commit."
>
>  for more details please ref :
> https://github.com/apache/iceberg/issues/8902
>
> Regards,
>
> Prashant Singh
>
> On Wed, Apr 10, 2024 at 8:46 AM Nirav Patel <nira...@gmail.com> wrote:
>
>>
>>
>> We are encountering the following issue where spark streaming read job
>> from iceberg table stays stuck after some maintenance jobs
>> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same
>> table.
>> https://github.com/apache/iceberg/issues/10117
>>
>>
>> I'm trying to understand what creates the end `StreamingOffset` from the
>> latest metadata.json file.
>> My theory is  `SparkMicroBatchStream.latestOffset()` doesn't seem
>> returning correct latest offset for table from latest metadata json or
>> `SparkMicroBatchStream.planFiles()` is not returning any Set when it
>> encounters `replace` snapshot.
>>
>> Are these 2 methods right places that determines what iceberg data
>> files/partitions MicroBatchExcecution will process upon trigger?
>>
>>
>>

Reply via email to