Hi Ryan, I guess the ticket you are looking for is the following [1]. AFAIK the work on it hasn't started yet. So we are still appreciating initial designs or ideas.
Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-25416 On Tue, Feb 22, 2022 at 11:54 PM Ryan van Huuksloot < ryan.vanhuuksl...@shopify.com> wrote: > Hi Fabian, > > Thanks for the response! I'll take a look at the CSVReaderFormat. > > Our team is interested in contributing to Parquet. However, our capacity > for the current sprint is fully committed to other workstreams. I'll put > this issue onto the backlog and see how it stacks against our internal > priorities over the next few cycles. > I did a scan for a JIRA issue for this file format restructure and didn't > find anything but do you know of a JIRA issue I can subscribe to for this > issue? Otherwise, I can create an issue for this change with Parquet. > > In regards to the "envisioned setup". > >> My understanding so far is you have Parquet files with backfill >> data and want to read all files and then continue the reading from >> Kafka. Is that correct? >> > This is correct, the only modification would be that we want the final > datastream type to be DataStream[T], where T is a Scala Case Class. The > user would provide T to the Hybrid Source at time of instantiation. So > pseudocode would look roughly like: > > long switchTimestamp = ...; // derive from file input paths >> FileSource<T> fileSource = >> FileSource.forBulkFileFormat(new ParquetColumnarRowInputFormat<T>(), >> Path.fromLocalFile(testDir)).build(); // Swap ParquetColumnarRowInputFormat >> for a Generic ParquetInputFormat >> KafkaSource<T> kafkaSource = KafkaSource.<T>builder() >> .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) >> .build(); >> HybridSource<T> hybridSource = HybridSource.builder(fileSource) >> .addSource(kafkaSource) >> .build(); > > DataStream<T> dataStream = env.fromSource(hybridSource, watermarkStrategy, >> name) >> > > Let me know if you have any questions! > > Thanks, > Ryan van Huuksloot > Data Developer | Data Platform Engineering | Streaming Capabilities > [image: Shopify] > <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> > > > On Mon, Feb 21, 2022 at 3:16 AM Fabian Paul <fp...@apache.org> wrote: > >> Hi Ryan, >> >> Thanks for bringing up this topic. Currently, your analysis is >> correct, and reading parquet files outside the Table API is rather >> difficult. The community started an effort in Flink 1.15 to >> restructure some of the formats to make them better applicable to the >> DataStream and Table API. You can have a look a the CSV format >> implementation[1]. Obviously, implementing the Parquet format is more >> complicated since it is more performance-sensitive. >> >> If you are willing to work on it, that would be great. We can also >> assist with the design and offer guidance during the implementation. >> >> One question I'd still like to ask is about your exact envisioned >> setup. My understanding so far is you have Parquet files with backfill >> data and want to read all files and then continue the reading from >> Kafka. Is that correct? >> >> Best >> Fabian >> >> [1] >> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java#L71 >> >> >> On Fri, Feb 18, 2022 at 11:22 PM Ryan van Huuksloot >> <ryan.vanhuuksl...@shopify.com> wrote: >> > >> > Hello, >> > >> > >> > Context: >> > >> > We are working on integrating Hybrid Sources with different Sources and >> Sinks. I have been working on a Parquet source that allows users to load >> the FileSource[T] so that the source can be used within Hybrid Sources >> where the HybridSource is of Type[T]. >> > >> > The environment is Scala 2.12 and we are using the DataStream API. The >> generic type “T” used in the email would be a Scala case class. >> > >> > >> > Problem: >> > >> > Based on the documentation, it is recommended that you use the >> ParquetColumnarRowInputFormat as an entrypoint to set up the Source. Given >> that ParquetColumnarRowInputFormat hard codes RowData, your other sources >> would then need to be of Type[RowData] to be used in HybridSource - from my >> experience - and you can’t convert FileSource[RowData] -> FileSource[T]. >> > >> > An alternative I looked into was extending ParquetVectorizedInputFormat >> but found that the type restrictions were difficult to reconcile. >> > >> > >> > Potential Solution: >> > >> > Create a better AbstractParquetBulkFormat, similar to the >> AbstractAvroBulkFormat added in 1.15. We would be available to contribute >> but want to understand if this is a direction Flink is willing to go before >> putting in the work! >> > >> > >> > Questions: >> > >> > >> > Based on the current implementation of Parquet within Flink, is it >> correct to say that the only entry-point for parquet is >> ParquetColumnarRowInputFormat? >> > >> > Is there any way to convert a FileSource[RowData] -> FileSource[T]? >> > >> > Would the potential solution listed above be an implementation that >> Flink would be interested in integrating? >> > >> > If not, do you have an example of Parquet being used in a HybridSource >> along with a Kafka Source? >> > >> > >> > Thanks! >> > Ryan van Huuksloot >> > Data Developer | Data Platform Engineering | Streaming Capabilities >> >