StephanEwen opened a new pull request #13401:
URL: https://github.com/apache/flink/pull/13401


   ## What is the purpose of the change
   
   This PR introduces a **new File Data Source** based on the *FLIP-27 source 
API*, under `flink-connectors/flink-connector-files`. 
   We add a new module, rather than adding this to the existing 
`flink-connectors/flink-connector-filesystem` project, because having this in a 
new module is a cleaner separation, and the new API needs fewer dependencies.
   
   To verify that this Source API is viable, this PR has some input format 
implementations, like a `TextInputFormat` (reading String lines), and stubs for 
a CVS reader and a vectorized ORC reader.
   
   ## File Source API
   
   The main attention in the review should probably go to the core API 
interfaces and classes, because these will be harder to change in the future:
     - `FileSource` as the main entry point in the API
     - The reader formats
         - `StreamFormat` (easy to implement, but not full control over object 
reuse, etc. Used for Text, CSV, ...)
         - `BulkFormat` (directly read batches, most efficient but more 
involved to implement, used for ORC, Parquet, ...)
     - The interfaces that define file discovery and assignment: 
`FileEnumerator` and `FileSplitAssigner`
   
   A good way to get a feeling for the API is to look at the 
[FileSourceTextLinesITCase](https://github.com/StephanEwen/flink/blob/file_source/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java)
   
   To check whether the abstraction for *reader formats* (the file readers / 
decoders) is intuitive and efficient, I'd recommend to look at these examples:
     - 
[TextLineFormat](flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java)
 making use of a `StreamFormat`
     - *CSV Reader* 
(flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormat.java)
 making use of a `StreamFormat`
     - *ORC Vectorized Readers (cf35e054ad6c31b004bf07af2c3f61e714d554f2) both 
for the case of returning a `VectorizedColumnBatch` as one, and for the case of 
returning `Rows`.
   
   It is worth noting that the `BulkFormat.Reader` is fairly close to the 
`SplitReader` itself (fetching a batch of records). This is on purpose, to 
support high efficiency readers. The main difference is that the `SplitReader` 
has awareness for splits and the `BulkFormat.Reader` has not.
   
   ## Testing this change
   
   This change can be tested by creating a `DataStream` API program that reads 
files, following this pattern:
   
   *Batch*
   ```java
   FileSource<String> source = FileSource
       .forRecordStreamFormat(new TextLineFormat(), new 
Path("file:///some/sample/file"))
       .build();
   
   DataStream<String> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "file-source");
   ```
   
   *Streaming*
   ```java
   FileSource<String> source = FileSource
       .forRecordStreamFormat(new TextLineFormat(), new 
Path("file:///some/sample/file"))
       .monitorContinuously(Duration.ofSeconds(5))
       .build();
   
   DataStream<String> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "file-source");
   ```
   
   See 
[FileSourceTextLinesITCase](https://github.com/StephanEwen/flink/blob/file_source/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java)
 for an end-to-end example.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **yes**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **yes**
     - If yes, how is the feature documented? **not documented**
   
   The API is not fully cross-reviewed yet and might change. Docs will be added 
as soon as the File Source API is approved.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to