[ 
https://issues.apache.org/jira/browse/FLINK-19161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19161:
-----------------------------------
    Labels: pull-request-available  (was: )

> Port File Sources to FLIP-27 API
> --------------------------------
>
>                 Key: FLINK-19161
>                 URL: https://issues.apache.org/jira/browse/FLINK-19161
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / FileSystem
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>
> Porting the File sources to the FLIP-27 API means combining the
>   - FileInputFormat from the DataSet Batch API
>   - The Monitoring File Source from the DataStream API.
> The two currently share the same reader code already and partial enumeration 
> code.
> *Structure*
> The new File Source will have three components:
>   - File enumerators that discover the files.
>   - File split assigners that decide which reader gets what split
>   - File Reader Formats, which deal with the decoding.
> The main difference between the Bounded (Batch) version and the unbounded 
> (Streaming) version is that the streaming version repeatedly invokes the file 
> enumerator to search for new files.
> *Checkpointing Enumerators*
> The enumerators need to checkpoint the not-yet-assigned splits, plus, if they 
> are in continuous discovery mode (streaming) the paths / timestamps already 
> processed.
> *Checkpointing Readers*
> The new File Source needs to ensure that every reader can be checkpointed.
> Some readers may be able to expose the position in the input file that 
> corresponds to the latest emitted record, but many will not be able to do 
> that due to
>   - storing compresses record batches
>   - using buffered decoders where exact position information is not accessible
> We therefore suggest to expose a mechanism that combines seekable file 
> offsets and records to read and skip after that offset. In the extreme cases, 
> files can work only with seekable positions or only with records-to-skip. 
> Some sources, like Avro, can have periodic seek points (sync markers) and 
> count records-to-skip after these markers.
> *Efficient and Convenient Readers*
> To balance efficiency (batch vectorized reading of ORC / Parquet for 
> vectorized query processing) and convenience (plug in 3-rd party CSV decoder 
> over stream) we offer three abstraction for record readers
>   - Bulk Formats that run over a file Path and return a iterable batch at a 
> time _(most efficient)_
>   - File Record formats which read files record-by-record. The source 
> framework hands over a pre-defined-size batch from Split Reader to Record 
> Emitter.
>   - Stream Formats that decode an input stream and rely on the source 
> framework to decide how to batch record handover _(most convenient)_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to