Hi Chetas,

> the only way out to use only the DataStream API (and not the table api)
if I want to use a custom splitComparator?

You can use watermark generation, and with that, watermark based split
ordering using the table api.
OTOH, currently there is no way to define a custom comparator using the
table api.

> I want to understand why increasing the number of task slots on the flink
cluster (1 TaskManager) reduces the total query execution time for the job
vertex that reads the iceberg table splits in the following scenario.

The table query you have shared contains an 'orderBy' clause. For me, this
suggests, that we need to read the whole table to get the first 2500 rows.
Are you sure, that not all of the splits are read in your tests?

Thanks,
Peter

On Wed, Apr 17, 2024, 04:15 Chetas Joshi <chetas.jo...@gmail.com> wrote:

> Hi Péter,
>
> Great! Thanks! The resources are really useful.
>
> I don't have TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE set so it is the
> FlinkSource
> <https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java>
> that is being used instead of the IcebergSource. I see the IcebergSource is
> more customizable i.e. custom splitComparator or assignerFactory can be
> provided as compared to the FlinkSource.
>
> I am reading the iceberg table in the following manner. I am using the
> table API for the ease of expressing the SQL query but the table needs to
> be converted to the dataStream as all our downstream operators can only
> work on the datastream.
>
> table = tableEnv
>     .from(tableId))
>     .select($("*"))
>     .where(filters)
>     .orderBy($(Columns.*TS*).asc())
>     .limit(2500);
>
> tableEnv.toDataStream(table).executeAndCollect()
>
> Since toDataStream call is responsible for creating
> <https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java#L134>
> an instance of the IcebergSource, I don't think there is a way to provide a
> custom splitComparator. Correct me if I am wrong. Is the only way out to
> use only the DataStream API (and not the table api) if I want to use a
> custom splitComparator?
>
>
>
> I have another question pertaining to reading an iceberg table using flink
> as described in the code above. I want to understand why increasing the
> number of task slots on the flink cluster (1 TaskManager) reduces the total
> query execution time for the job vertex that reads the iceberg table splits
> in the following scenario.
>
> For a given query, the splitPlanner creates 8 splits (I am using the
> default values for the iceberg read properties). Each split contains 2 data
> files (data input split files) and all the data files are almost the same
> size.
>
> 1. With # task slots = 4, 4 splits are submitted to the task slots in
> parallel at a time. All the 4 tasks return 2500 records and the job
> finishes. The other 4 splits do not need to be submitted. The job vertex
> takes 2 mins (max time taken by a task among these 4 parallel tasks).
> 2. With # task slots = 2, 2 splits are submitted to the task slots in
> parallel at a time. These 2 tasks return 2500 records and the job finishes.
> The other 6 splits do not need to be submitted. The job vertex in this
> scenario takes 3 mins; (max time taken by a task among these 2 parallel
> tasks).
>
> I expected the job vertex to have taken almost the same amount of time in
> both these scenarios because the split size is the same ( same # of data
> files and each data file is of the same size) and all the tasks return 2500
> records which is the limit of the query.
>
> Would appreciate it if you have any insights into why this behavior.
>
> Thank you
> Chetas
>
>
> On Tue, Apr 16, 2024 at 12:49 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Hi Chetas,
>>
>> See my answers below:
>>
>>
>> On Tue, Apr 16, 2024, 06:39 Chetas Joshi <chetas.jo...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am running a batch flink job to read an iceberg table. I want to
>>> understand a few things.
>>>
>>> 1. How does the FlinkSplitPlanner decide which fileScanTasks (I think
>>> one task corresponds to one data file) need to be clubbed together within a
>>> single split and when to create a new split?
>>>
>>
>> You can take a look at the generic read properties for Iceberg tables:
>> https://iceberg.apache.org/docs/nightly/configuration/#read-properties
>>
>> The most interesting ones for you are:
>> - read.split.target-size
>> - read.split.metadata-target-size
>> - read.split.planning-lookback
>> - read.split.open-file-cost
>>
>> 2. When the number of task slots is limited, what is the sequence in
>>> which the splits are assigned to the task slots?
>>> For example,  if there are 4 task slots available but the number of
>>> splits (source parallelism) to be read is 8, which 4 splits will be sent to
>>> the task slots first? Where in the codebase does this logic exist?
>>>
>>
>> As a general rule, there is no pre-defined order between the splits, and
>> because of the parallelism, the order of the records are not defined.
>>
>> It is a bit low level API, and might be removed in the future, but you
>> can define your own comparator to order the splits:
>>
>> https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java#L248
>>
>> Or you can use the fileSequenceNumber comparator to order the splits
>> based on the commit order:
>>
>> https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java#L34
>>
>> If you have file statistics collected for the table you can play around
>> with the watermark settings to create a bit of ordering during the reads:
>>
>> https://iceberg.apache.org/docs/1.5.0/flink-queries/#emitting-watermarks
>>
>>
>>> Would appreciate any docs, pointers to the codebase that could help me
>>> understand the above.
>>>
>>> Thanks
>>> Chetas
>>>
>>

Reply via email to