Hi Péter,

Great! Thanks! The resources are really useful.

I don't have TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE set so it is the
FlinkSource
<https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java>
that is being used instead of the IcebergSource. I see the IcebergSource is
more customizable i.e. custom splitComparator or assignerFactory can be
provided as compared to the FlinkSource.

I am reading the iceberg table in the following manner. I am using the
table API for the ease of expressing the SQL query but the table needs to
be converted to the dataStream as all our downstream operators can only
work on the datastream.

table = tableEnv
    .from(tableId))
    .select($("*"))
    .where(filters)
    .orderBy($(Columns.*TS*).asc())
    .limit(2500);

tableEnv.toDataStream(table).executeAndCollect()

Since toDataStream call is responsible for creating
<https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java#L134>
an instance of the IcebergSource, I don't think there is a way to provide a
custom splitComparator. Correct me if I am wrong. Is the only way out to
use only the DataStream API (and not the table api) if I want to use a
custom splitComparator?



I have another question pertaining to reading an iceberg table using flink
as described in the code above. I want to understand why increasing the
number of task slots on the flink cluster (1 TaskManager) reduces the total
query execution time for the job vertex that reads the iceberg table splits
in the following scenario.

For a given query, the splitPlanner creates 8 splits (I am using the
default values for the iceberg read properties). Each split contains 2 data
files (data input split files) and all the data files are almost the same
size.

1. With # task slots = 4, 4 splits are submitted to the task slots in
parallel at a time. All the 4 tasks return 2500 records and the job
finishes. The other 4 splits do not need to be submitted. The job vertex
takes 2 mins (max time taken by a task among these 4 parallel tasks).
2. With # task slots = 2, 2 splits are submitted to the task slots in
parallel at a time. These 2 tasks return 2500 records and the job finishes.
The other 6 splits do not need to be submitted. The job vertex in this
scenario takes 3 mins; (max time taken by a task among these 2 parallel
tasks).

I expected the job vertex to have taken almost the same amount of time in
both these scenarios because the split size is the same ( same # of data
files and each data file is of the same size) and all the tasks return 2500
records which is the limit of the query.

Would appreciate it if you have any insights into why this behavior.

Thank you
Chetas


On Tue, Apr 16, 2024 at 12:49 PM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Hi Chetas,
>
> See my answers below:
>
>
> On Tue, Apr 16, 2024, 06:39 Chetas Joshi <chetas.jo...@gmail.com> wrote:
>
>> Hello,
>>
>> I am running a batch flink job to read an iceberg table. I want to
>> understand a few things.
>>
>> 1. How does the FlinkSplitPlanner decide which fileScanTasks (I think one
>> task corresponds to one data file) need to be clubbed together within a
>> single split and when to create a new split?
>>
>
> You can take a look at the generic read properties for Iceberg tables:
> https://iceberg.apache.org/docs/nightly/configuration/#read-properties
>
> The most interesting ones for you are:
> - read.split.target-size
> - read.split.metadata-target-size
> - read.split.planning-lookback
> - read.split.open-file-cost
>
> 2. When the number of task slots is limited, what is the sequence in which
>> the splits are assigned to the task slots?
>> For example,  if there are 4 task slots available but the number of
>> splits (source parallelism) to be read is 8, which 4 splits will be sent to
>> the task slots first? Where in the codebase does this logic exist?
>>
>
> As a general rule, there is no pre-defined order between the splits, and
> because of the parallelism, the order of the records are not defined.
>
> It is a bit low level API, and might be removed in the future, but you can
> define your own comparator to order the splits:
>
> https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java#L248
>
> Or you can use the fileSequenceNumber comparator to order the splits based
> on the commit order:
>
> https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java#L34
>
> If you have file statistics collected for the table you can play around
> with the watermark settings to create a bit of ordering during the reads:
>
> https://iceberg.apache.org/docs/1.5.0/flink-queries/#emitting-watermarks
>
>
>> Would appreciate any docs, pointers to the codebase that could help me
>> understand the above.
>>
>> Thanks
>> Chetas
>>
>

Reply via email to