Hello Flink Community, Flink Version: 1.16.1, Zookeeper for HA. My Flink Applications reads raw parquet files hosted in S3, applies transformations and re-writes them to S3, under a different location. Below is my code to read from parquets from S3: ``` final Configuration configuration = new Configuration(); configuration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); final ParquetColumnarRowInputFormat<FileSourceSplit> format = new ParquetColumnarRowInputFormat<>( configuration, <my_row_type>, InternalTypeInfo.of(<my_row_type>), 100, true, true ); final FileSource<RowData> source = FileSource .forBulkFileFormat(format, new Path("s3/<my_location>")) .build(); stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "parquet-source"); ``` I noticed the following: 1. My S3 directory, "s3/<my_location>/", can have more than 1M+ files. The parquets in this directory are partitioned by date and time. This makes the folder structure of this directory deterministic. e.g "s3/<my_location>/partiton_column_a/partition_columb_b/2023-09-25--13/{1,2...N}.parquet". I believe the Flink Default FileSource is doing a list on this large directory and gets stuck waiting for the operation to complete. The Akka connect timeout error messages in the Task Manager logs support this. Additionally, the job runs successfully when I restrict the input to a subfolder, looking at only an hour's data, based on the mentioned partitioning scheme. In my local machine, I also tried using S3 CLI to recursively list this directory and the operation did not complete in 1 hour.
*Is this behavior expected based on Flink's S3 source implementation? *Looking at the docs <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/>, one way to solve this is to implement the Split Enumerator by incrementally processing the subfolders in "s3/<my_location>/", based on the mentioned partitioning scheme. *Are there any other approaches available?* 2. Following the code above, when I deserialize records from S3 I get records of type BinaryRowData <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.html>. However, when I use the same code in Unit Testing, with MiniClusterWithClientResource <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/>, to read from a local parquet file (not S3), I get records of type GenericRowData <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/data/GenericRowData.html> . *What is the reason for this discrepancy and is it possible to force deserialization to output type GenericRowData? *Currently, I have written code to convert BinaryRowData to GenericRowData as our downstream ProcessFunctions expect this type. I *s there a better solution to transform BinaryRowData to GenericRowData?* Thank you. Varun C, DoorDash