Hi Nirav,

> in our case streaming job was stuck for over 3 days

3 days seems too much, what exactly were the read limits and how many files
before and after compaction ? Can you describe a bit more on your ingestion
rate ?

> Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the
beginning after line `while (shouldContinueReading) {`  in `latestOffSet

Seems like a valid case, do you wanna open a pr for the same ? Happy to
take a deeper look and help here.

Thanks,
Prashant Singh



On Wed, Apr 10, 2024 at 2:48 PM Nirav Patel <nira...@gmail.com> wrote:

> Hi Prashant,
>
> Thanks for responding and sharing the related issue. Issue that's been fix
> seem very much related. However, in our case streaming job was stuck for
> over 3 days. You are sayiing because of it scanning all the Manifests list
> `latestOffSet` may  not be returning for that long!
>
> Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the
> beginning after line `while (shouldContinueReading) {`  in `latestOffSet`
>
>
> https://github.com/apache/iceberg/blob/b2a4fda8a80571a6a63f301ff67c63ecc0380cf7/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L350C3-L350C36
>
> That way first snapshot ( startingOffset) is a `replace` snapshot it will
> get skipped. DUe to compaction job rewriting over 1m files listing its
> manifests could be too high unnecessarily.
>
>
>
>
>
> On Wed, Apr 10, 2024 at 4:31 PM Prashant Singh <prashant010...@gmail.com>
> wrote:
>
>> Hi Nirav,
>>
>> Thanks for reporting the issue, let me try answering your question below
>> :)
>>
>> > We are encountering the following issue where spark streaming read job
>> from iceberg table stays stuck after some maintenance jobs
>> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same
>> table.
>> https://github.com/apache/iceberg/issues/10117
>>
>> can you please try this the fix in iceberg 1.5 related to skipping of
>> rewrite files (please ref : https://github.com/apache/iceberg/pull/8980)
>>
>> > My theory is  `SparkMicroBatchStream.latestOffset()` doesn't seem
>> returning correct latest offset for table from latest metadata json or
>> `SparkMicroBatchStream.planFiles()` is not returning any Set when it
>> encounters `replace` snapshot.
>>
>> You are mostly close, yes those two are the ones you should check for,
>> the issue which the pr above fixes is the skipping of replace files 1 by 1
>> (can be diff based on the configured rate limits) and this giving an
>> impression of stream being struck in the background it's being read, please
>> ref to the explanation here :
>>
>> "Suppose you trigger every minute and that you encounter a rewrite
>> snapshot with 300 files. This means it will take 300 x 1 minute (300
>> minutes) to finally skip over the snapshot and start progressing again.
>>
>> I think that once a rewrite snapshot is detected we should exhaust all
>> the positions (all the files) in that commit to position ourselves for
>> the next commit."
>>
>>  for more details please ref :
>> https://github.com/apache/iceberg/issues/8902
>>
>> Regards,
>>
>> Prashant Singh
>>
>> On Wed, Apr 10, 2024 at 8:46 AM Nirav Patel <nira...@gmail.com> wrote:
>>
>>>
>>>
>>> We are encountering the following issue where spark streaming read job
>>> from iceberg table stays stuck after some maintenance jobs
>>> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same
>>> table.
>>> https://github.com/apache/iceberg/issues/10117
>>>
>>>
>>> I'm trying to understand what creates the end `StreamingOffset` from the
>>> latest metadata.json file.
>>> My theory is  `SparkMicroBatchStream.latestOffset()` doesn't seem
>>> returning correct latest offset for table from latest metadata json or
>>> `SparkMicroBatchStream.planFiles()` is not returning any Set when it
>>> encounters `replace` snapshot.
>>>
>>> Are these 2 methods right places that determines what iceberg data
>>> files/partitions MicroBatchExcecution will process upon trigger?
>>>
>>>
>>>

Reply via email to