Hi Ryan,

I guess the ticket you are looking for is the following [1]. AFAIK the work
on it hasn't started yet. So we are still appreciating initial designs or
ideas.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-25416

On Tue, Feb 22, 2022 at 11:54 PM Ryan van Huuksloot <
ryan.vanhuuksl...@shopify.com> wrote:

> Hi Fabian,
>
> Thanks for the response! I'll take a look at the CSVReaderFormat.
>
> Our team is interested in contributing to Parquet. However, our capacity
> for the current sprint is fully committed to other workstreams. I'll put
> this issue onto the backlog and see how it stacks against our internal
> priorities over the next few cycles.
> I did a scan for a JIRA issue for this file format restructure and didn't
> find anything but do you know of a JIRA issue I can subscribe to for this
> issue? Otherwise, I can create an issue for this change with Parquet.
>
> In regards to the "envisioned setup".
>
>> My understanding so far is you have Parquet files with backfill
>> data and want to read all files and then continue the reading from
>> Kafka. Is that correct?
>>
> This is correct, the only modification would be that we want the final
> datastream type to be DataStream[T], where T is a Scala Case Class. The
> user would provide T to the Hybrid Source at time of instantiation. So
> pseudocode would look roughly like:
>
> long switchTimestamp = ...; // derive from file input paths
>> FileSource<T> fileSource =
>> FileSource.forBulkFileFormat(new ParquetColumnarRowInputFormat<T>(),
>> Path.fromLocalFile(testDir)).build(); // Swap ParquetColumnarRowInputFormat
>> for a Generic ParquetInputFormat
>> KafkaSource<T> kafkaSource = KafkaSource.<T>builder()
>> .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
>> .build();
>> HybridSource<T> hybridSource = HybridSource.builder(fileSource)
>> .addSource(kafkaSource)
>> .build();
>
> DataStream<T> dataStream = env.fromSource(hybridSource, watermarkStrategy,
>> name)
>>
>
> Let me know if you have any questions!
>
> Thanks,
> Ryan van Huuksloot
> Data Developer | Data Platform Engineering | Streaming Capabilities
> [image: Shopify]
> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>
>
> On Mon, Feb 21, 2022 at 3:16 AM Fabian Paul <fp...@apache.org> wrote:
>
>> Hi Ryan,
>>
>> Thanks for bringing up this topic. Currently, your analysis is
>> correct, and reading parquet files outside the Table API is rather
>> difficult. The community started an effort in Flink 1.15 to
>> restructure some of the formats to make them better applicable to the
>> DataStream and Table API. You can have a look a the CSV format
>> implementation[1]. Obviously, implementing the Parquet format is more
>> complicated since it is more performance-sensitive.
>>
>> If you are willing to work on it, that would be great. We can also
>> assist with the design and offer guidance during the implementation.
>>
>> One question I'd still like to ask is about your exact envisioned
>> setup. My understanding so far is you have Parquet files with backfill
>> data and want to read all files and then continue the reading from
>> Kafka. Is that correct?
>>
>> Best
>> Fabian
>>
>> [1]
>> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java#L71
>>
>>
>> On Fri, Feb 18, 2022 at 11:22 PM Ryan van Huuksloot
>> <ryan.vanhuuksl...@shopify.com> wrote:
>> >
>> > Hello,
>> >
>> >
>> > Context:
>> >
>> > We are working on integrating Hybrid Sources with different Sources and
>> Sinks. I have been working on a Parquet source that allows users to load
>> the FileSource[T] so that the source can be used within Hybrid Sources
>> where the HybridSource is of Type[T].
>> >
>> > The environment is Scala 2.12 and we are using the DataStream API. The
>> generic type “T” used in the email would be a Scala case class.
>> >
>> >
>> > Problem:
>> >
>> > Based on the documentation, it is recommended that you use the
>> ParquetColumnarRowInputFormat as an entrypoint to set up the Source. Given
>> that ParquetColumnarRowInputFormat hard codes RowData, your other sources
>> would then need to be of Type[RowData] to be used in HybridSource - from my
>> experience - and you can’t convert FileSource[RowData] -> FileSource[T].
>> >
>> > An alternative I looked into was extending ParquetVectorizedInputFormat
>> but found that the type restrictions were difficult to reconcile.
>> >
>> >
>> > Potential Solution:
>> >
>> > Create a better AbstractParquetBulkFormat, similar to the
>> AbstractAvroBulkFormat added in 1.15. We would be available to contribute
>> but want to understand if this is a direction Flink is willing to go before
>> putting in the work!
>> >
>> >
>> > Questions:
>> >
>> >
>> > Based on the current implementation of Parquet within Flink, is it
>> correct to say that the only entry-point for parquet is
>> ParquetColumnarRowInputFormat?
>> >
>> > Is there any way to convert a FileSource[RowData] -> FileSource[T]?
>> >
>> > Would the potential solution listed above be an implementation that
>> Flink would be interested in integrating?
>> >
>> > If not, do you have an example of Parquet being used in a HybridSource
>> along with a Kafka Source?
>> >
>> >
>> > Thanks!
>> > Ryan van Huuksloot
>> > Data Developer | Data Platform Engineering | Streaming Capabilities
>>
>

Reply via email to