Hi,

> Can you describe a bit more on your ingestion rate ?
> what exactly were the read limits?
Streaming job ingestion is maximum 1M records per batch. Trigger interval
is every 1 minute which seem to be fine for regular stream processing. Our
avg per minute record count is way less than that.

> How many files before and after compaction ?
Compaction went over the last 6+ months of data. Before compaction we had
over 1.4 M files (small files) in active snapshot. After compaction it was
only 17k. Compaction rewrote all the files and records. Total of 19 Billion
records were rewritten into 17k files.
so per math it would take 19B/1M minute = ~13 days to skip over all those
replace records?

Spark offset file has following for compaction offset: (this is the last
offset that ever gets generated on spark offset dir)
{"version":1,"snapshot_id":6784284089888508514,"position":0,"scan_all_files":false}

Here's details of this snapshot_id from one of the metadata.json


{
    "sequence-number" : 30742,
    "snapshot-id" : 6784284089888508514,
    "parent-snapshot-id" : 5804171981043532874,
    "timestamp-ms" : 1712399784222,
    "summary" : {
      "operation" : "replace",
      "added-data-files" : "9854",
      "deleted-data-files" : "1395902",
      "added-records" : "19050637961",
      "deleted-records" : "19050637961",
      "added-files-size" : "771245970960",
      "removed-files-size" : "995193063843",
      "changed-partition-count" : "4258",
      "total-records" : "19313094770",
      "total-files-size" : "784783197866",
      "total-data-files" : "17523",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : ".gs://..gc5a2ac.avro",
    "schema-id" : 0
  }


So No new offset was being generated after streaming read job encounter
offset with snapshot id of compaction job.
Over 3 days all that driver node reported is following. No Streaming Query
stats that gets generated after every Trigger execution.

MicroBatchExecution:60 - Streaming query has been idle and waiting for new
data more than 10000 ms.
MicroBatchExecution:60 - Streaming query has been idle and waiting for new
data more than 10000 ms.
BaseMetastoreTableOperations:199 - Refreshing table metadata from new
version: gs://...my_table_9a21adbf.metadata.json
MicroBatchExecution:60 - Streaming query has been idle and waiting for new
data more than 10000 ms.
MicroBatchExecution:60 - Streaming query has been idle and waiting for new
data more than 10000 ms.
MicroBatchExecution:60 - Streaming query has been idle and waiting for new
data more than 10000 ms.
MicroBatchExecution:60 - Streaming query has been idle and waiting for new
data more than 10000 ms.
...
..
BaseMetastoreTableOperations:199 - Refreshing table metadata from new
version: gs://...my_table_23423412a2323bf.metadata.json
MicroBatchExecution:60 - Streaming query has been idle and waiting for new
data more than 10000 ms.
MicroBatchExecution:60 - Streaming query has been idle and waiting for new
data more than 10000 ms.
...
.
Line `Refreshing table metadata..` picks up new metadata.json which is
correct but nothing happens after that. no offset creation/commit, query
planning, job processing, commit datafiles and commit log, query stat. etc
none of it. Usually after that line i see "MicroBatchExecution:64 -
Starting Trigger Calculation" line but I don't see it in this particular
case. I tried to scan `MicroBatchExecution` code to understand code path
but I can't find where it prints `Starting Trigger Calculation`. Do you
know where that logging happens?

I don't behavior with respect to `LatestOffset`. Is it that in each trigger
MicroBatch is still processing previously received 19B records?
Line `Refreshing table metadata..` in logs indicate that job is triggering
and is getting new metadata file but nothing is happening.

>From the code I also don't understand logic behind getting curPos. It seems
like `LatestOffset` may always be concluding curPos = 0 and returning
`null` as latestoffset. That could explain why no new offset file is being
generated and the job always starts over but I might be completely wrong on
that as Line `Refreshing table metadata..` indicates something precursor to
trigger is happening.

anyways, hope above helps.

Thanks
Nirav






On Sun, Apr 14, 2024 at 11:56 AM Prashant Singh <prashant010...@gmail.com>
wrote:

> Hi Nirav,
>
> > in our case streaming job was stuck for over 3 days
>
> 3 days seems too much, what exactly were the read limits and how many
> files before and after compaction ? Can you describe a bit more on your
> ingestion rate ?
>
> > Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the
> beginning after line `while (shouldContinueReading) {`  in `latestOffSet
>
> Seems like a valid case, do you wanna open a pr for the same ? Happy to
> take a deeper look and help here.
>
> Thanks,
> Prashant Singh
>
>
>
> On Wed, Apr 10, 2024 at 2:48 PM Nirav Patel <nira...@gmail.com> wrote:
>
>> Hi Prashant,
>>
>> Thanks for responding and sharing the related issue. Issue that's been
>> fix seem very much related. However, in our case streaming job was stuck
>> for over 3 days. You are sayiing because of it scanning all the Manifests
>> list `latestOffSet` may  not be returning for that long!
>>
>> Also why not add `nextValidSnapshot(Snapshot curSnapshot)` check at the
>> beginning after line `while (shouldContinueReading) {`  in `latestOffSet`
>>
>>
>> https://github.com/apache/iceberg/blob/b2a4fda8a80571a6a63f301ff67c63ecc0380cf7/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L350C3-L350C36
>>
>> That way first snapshot ( startingOffset) is a `replace` snapshot it will
>> get skipped. DUe to compaction job rewriting over 1m files listing its
>> manifests could be too high unnecessarily.
>>
>>
>>
>>
>>
>> On Wed, Apr 10, 2024 at 4:31 PM Prashant Singh <prashant010...@gmail.com>
>> wrote:
>>
>>> Hi Nirav,
>>>
>>> Thanks for reporting the issue, let me try answering your question below
>>> :)
>>>
>>> > We are encountering the following issue where spark streaming read job
>>> from iceberg table stays stuck after some maintenance jobs
>>> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same
>>> table.
>>> https://github.com/apache/iceberg/issues/10117
>>>
>>> can you please try this the fix in iceberg 1.5 related to skipping of
>>> rewrite files (please ref : https://github.com/apache/iceberg/pull/8980)
>>>
>>>
>>> > My theory is  `SparkMicroBatchStream.latestOffset()` doesn't seem
>>> returning correct latest offset for table from latest metadata json or
>>> `SparkMicroBatchStream.planFiles()` is not returning any Set when it
>>> encounters `replace` snapshot.
>>>
>>> You are mostly close, yes those two are the ones you should check for,
>>> the issue which the pr above fixes is the skipping of replace files 1 by 1
>>> (can be diff based on the configured rate limits) and this giving an
>>> impression of stream being struck in the background it's being read, please
>>> ref to the explanation here :
>>>
>>> "Suppose you trigger every minute and that you encounter a rewrite
>>> snapshot with 300 files. This means it will take 300 x 1 minute (300
>>> minutes) to finally skip over the snapshot and start progressing again.
>>>
>>> I think that once a rewrite snapshot is detected we should exhaust all
>>> the positions (all the files) in that commit to position ourselves for
>>> the next commit."
>>>
>>>  for more details please ref :
>>> https://github.com/apache/iceberg/issues/8902
>>>
>>> Regards,
>>>
>>> Prashant Singh
>>>
>>> On Wed, Apr 10, 2024 at 8:46 AM Nirav Patel <nira...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> We are encountering the following issue where spark streaming read job
>>>> from iceberg table stays stuck after some maintenance jobs
>>>> (rewrite_data_files and rewrite_manifests) has been ran on parallel on same
>>>> table.
>>>> https://github.com/apache/iceberg/issues/10117
>>>>
>>>>
>>>> I'm trying to understand what creates the end `StreamingOffset` from
>>>> the latest metadata.json file.
>>>> My theory is  `SparkMicroBatchStream.latestOffset()` doesn't seem
>>>> returning correct latest offset for table from latest metadata json or
>>>> `SparkMicroBatchStream.planFiles()` is not returning any Set when it
>>>> encounters `replace` snapshot.
>>>>
>>>> Are these 2 methods right places that determines what iceberg data
>>>> files/partitions MicroBatchExcecution will process upon trigger?
>>>>
>>>>
>>>>

Reply via email to