Hi Péter, Great! Thanks! The resources are really useful.
I don't have TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE set so it is the FlinkSource <https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java> that is being used instead of the IcebergSource. I see the IcebergSource is more customizable i.e. custom splitComparator or assignerFactory can be provided as compared to the FlinkSource. I am reading the iceberg table in the following manner. I am using the table API for the ease of expressing the SQL query but the table needs to be converted to the dataStream as all our downstream operators can only work on the datastream. table = tableEnv .from(tableId)) .select($("*")) .where(filters) .orderBy($(Columns.*TS*).asc()) .limit(2500); tableEnv.toDataStream(table).executeAndCollect() Since toDataStream call is responsible for creating <https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java#L134> an instance of the IcebergSource, I don't think there is a way to provide a custom splitComparator. Correct me if I am wrong. Is the only way out to use only the DataStream API (and not the table api) if I want to use a custom splitComparator? I have another question pertaining to reading an iceberg table using flink as described in the code above. I want to understand why increasing the number of task slots on the flink cluster (1 TaskManager) reduces the total query execution time for the job vertex that reads the iceberg table splits in the following scenario. For a given query, the splitPlanner creates 8 splits (I am using the default values for the iceberg read properties). Each split contains 2 data files (data input split files) and all the data files are almost the same size. 1. With # task slots = 4, 4 splits are submitted to the task slots in parallel at a time. All the 4 tasks return 2500 records and the job finishes. The other 4 splits do not need to be submitted. The job vertex takes 2 mins (max time taken by a task among these 4 parallel tasks). 2. With # task slots = 2, 2 splits are submitted to the task slots in parallel at a time. These 2 tasks return 2500 records and the job finishes. The other 6 splits do not need to be submitted. The job vertex in this scenario takes 3 mins; (max time taken by a task among these 2 parallel tasks). I expected the job vertex to have taken almost the same amount of time in both these scenarios because the split size is the same ( same # of data files and each data file is of the same size) and all the tasks return 2500 records which is the limit of the query. Would appreciate it if you have any insights into why this behavior. Thank you Chetas On Tue, Apr 16, 2024 at 12:49 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Chetas, > > See my answers below: > > > On Tue, Apr 16, 2024, 06:39 Chetas Joshi <chetas.jo...@gmail.com> wrote: > >> Hello, >> >> I am running a batch flink job to read an iceberg table. I want to >> understand a few things. >> >> 1. How does the FlinkSplitPlanner decide which fileScanTasks (I think one >> task corresponds to one data file) need to be clubbed together within a >> single split and when to create a new split? >> > > You can take a look at the generic read properties for Iceberg tables: > https://iceberg.apache.org/docs/nightly/configuration/#read-properties > > The most interesting ones for you are: > - read.split.target-size > - read.split.metadata-target-size > - read.split.planning-lookback > - read.split.open-file-cost > > 2. When the number of task slots is limited, what is the sequence in which >> the splits are assigned to the task slots? >> For example, if there are 4 task slots available but the number of >> splits (source parallelism) to be read is 8, which 4 splits will be sent to >> the task slots first? Where in the codebase does this logic exist? >> > > As a general rule, there is no pre-defined order between the splits, and > because of the parallelism, the order of the records are not defined. > > It is a bit low level API, and might be removed in the future, but you can > define your own comparator to order the splits: > > https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java#L248 > > Or you can use the fileSequenceNumber comparator to order the splits based > on the commit order: > > https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java#L34 > > If you have file statistics collected for the table you can play around > with the watermark settings to create a bit of ordering during the reads: > > https://iceberg.apache.org/docs/1.5.0/flink-queries/#emitting-watermarks > > >> Would appreciate any docs, pointers to the codebase that could help me >> understand the above. >> >> Thanks >> Chetas >> >