StephanEwen opened a new pull request #13401: URL: https://github.com/apache/flink/pull/13401
## What is the purpose of the change This PR introduces a **new File Data Source** based on the *FLIP-27 source API*, under `flink-connectors/flink-connector-files`. We add a new module, rather than adding this to the existing `flink-connectors/flink-connector-filesystem` project, because having this in a new module is a cleaner separation, and the new API needs fewer dependencies. To verify that this Source API is viable, this PR has some input format implementations, like a `TextInputFormat` (reading String lines), and stubs for a CVS reader and a vectorized ORC reader. ## File Source API The main attention in the review should probably go to the core API interfaces and classes, because these will be harder to change in the future: - `FileSource` as the main entry point in the API - The reader formats - `StreamFormat` (easy to implement, but not full control over object reuse, etc. Used for Text, CSV, ...) - `BulkFormat` (directly read batches, most efficient but more involved to implement, used for ORC, Parquet, ...) - The interfaces that define file discovery and assignment: `FileEnumerator` and `FileSplitAssigner` A good way to get a feeling for the API is to look at the [FileSourceTextLinesITCase](https://github.com/StephanEwen/flink/blob/file_source/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java) To check whether the abstraction for *reader formats* (the file readers / decoders) is intuitive and efficient, I'd recommend to look at these examples: - [TextLineFormat](flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java) making use of a `StreamFormat` - *CSV Reader* (flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormat.java) making use of a `StreamFormat` - *ORC Vectorized Readers (cf35e054ad6c31b004bf07af2c3f61e714d554f2) both for the case of returning a `VectorizedColumnBatch` as one, and for the case of returning `Rows`. It is worth noting that the `BulkFormat.Reader` is fairly close to the `SplitReader` itself (fetching a batch of records). This is on purpose, to support high efficiency readers. The main difference is that the `SplitReader` has awareness for splits and the `BulkFormat.Reader` has not. ## Testing this change This change can be tested by creating a `DataStream` API program that reads files, following this pattern: *Batch* ```java FileSource<String> source = FileSource .forRecordStreamFormat(new TextLineFormat(), new Path("file:///some/sample/file")) .build(); DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); ``` *Streaming* ```java FileSource<String> source = FileSource .forRecordStreamFormat(new TextLineFormat(), new Path("file:///some/sample/file")) .monitorContinuously(Duration.ofSeconds(5)) .build(); DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); ``` See [FileSourceTextLinesITCase](https://github.com/StephanEwen/flink/blob/file_source/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java) for an end-to-end example. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **yes** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **not documented** The API is not fully cross-reviewed yet and might change. Docs will be added as soon as the File Source API is approved. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org