Hi Nirav,

Thanks for reporting the issue, let me try answering your question below :)

> We are encountering the following issue where spark streaming read job
from iceberg table stays stuck after some maintenance jobs
(rewrite_data_files and rewrite_manifests) has been ran on parallel on same
table.
https://github.com/apache/iceberg/issues/10117

can you please try this the fix in iceberg 1.5 related to skipping of
rewrite files (please ref : https://github.com/apache/iceberg/pull/8980)

> My theory is  `SparkMicroBatchStream.latestOffset()` doesn't seem
returning correct latest offset for table from latest metadata json or
`SparkMicroBatchStream.planFiles()` is not returning any Set when it
encounters `replace` snapshot.

You are mostly close, yes those two are the ones you should check for, the
issue which the pr above fixes is the skipping of replace files 1 by 1 (can
be diff based on the configured rate limits) and this giving an impression
of stream being struck in the background it's being read, please ref to the
explanation here :

"Suppose you trigger every minute and that you encounter a rewrite snapshot
with 300 files. This means it will take 300 x 1 minute (300 minutes) to
finally skip over the snapshot and start progressing again.

I think that once a rewrite snapshot is detected we should exhaust all the
positions (all the files) in that commit to position ourselves for the next
commit."

 for more details please ref : https://github.com/apache/iceberg/issues/8902

Regards,

Prashant Singh

On Wed, Apr 10, 2024 at 8:46 AM Nirav Patel <nira...@gmail.com> wrote:

>
>
> We are encountering the following issue where spark streaming read job
> from iceberg table stays stuck after some maintenance jobs
> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same
> table.
> https://github.com/apache/iceberg/issues/10117
>
>
> I'm trying to understand what creates the end `StreamingOffset` from the
> latest metadata.json file.
> My theory is  `SparkMicroBatchStream.latestOffset()` doesn't seem
> returning correct latest offset for table from latest metadata json or
> `SparkMicroBatchStream.planFiles()` is not returning any Set when it
> encounters `replace` snapshot.
>
> Are these 2 methods right places that determines what iceberg data
> files/partitions MicroBatchExcecution will process upon trigger?
>
>
>

Reply via email to