Re: Using HybridSource

2023-07-06 Thread Ken Krugler
Hi Oscar, As Andrew noted, you could use the approach of overriding the CsvReaderFormat (or whatever you’re using to extract CSV data from S3) to return a Protobuf that’s the same format as what you’re getting from Kafka, though that obviously introduces some inefficiencies. Or you could do the

Re: Using HybridSource

2023-07-05 Thread Andrew Otto
Hm. I wonder if you could implement a custom Deserializer that wraps both the CSV and Protobuf deserializer, and conditionally chooses which one to use. As long as the final TypeInformation returned by the Source is the same in either case, I think it should work? > Kafka comes from protobuf while

Re: Using HybridSource

2023-07-05 Thread Oscar Perez via user
and this is our case Alexander, it is the same data schema but different data format. Kafka comes from protobuf while the CSV is a POJO though both have the same fields. IMHO, the design of HybridSource is very limited and you have to do nasty workarounds if you want to combine from cold storage (C

Re: Using HybridSource

2023-07-05 Thread Alexander Fedulov
I do not think that trying to "squash" two different data types into one just to use HybridSource is the right thing to do here. HybridSource is primarily intended for use cases that need to read the same data from different sources. A typical example: read events from "cold storage" in S3 up to a

Re: Using HybridSource

2023-07-04 Thread Péter Váry
Was it a conscious decision that HybridSource only accept Sources, and does not allow mapping functions applied to them before combining them? On Tue, Jul 4, 2023, 23:53 Ken Krugler wrote: > Hi Oscar, > > Couldn’t you have both the Kafka and File sources return an Either from CSV File, Protobuf

Re: Using HybridSource

2023-07-04 Thread Ken Krugler
Hi Oscar, Couldn’t you have both the Kafka and File sources return an Either, and then (after the HybridSource) use a MapFunction to convert to the unified/correct type? — Ken > On Jul 4, 2023, at 12:13 PM, Oscar Perez via user > wrote: > > Hei, > 1) We populate state based on this CSV data

Re: Using HybridSource

2023-07-04 Thread Oscar Perez via user
Hei, 1) We populate state based on this CSV data and do business logic based on this state and events coming from other unrelated streams. 2) We are using low level process function in order to process this future hybrid source Regardless of the aforementioned points, please note that the main cha

Re: Using HybridSource

2023-07-04 Thread Alexander Fedulov
@Oscar 1. How do you plan to use that CSV data? Is it needed for lookup from the "main" stream? 2. Which API are you using? DataStream/SQL/Table or low level ProcessFunction? Best, Alex On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user wrote: > ok, but is it? As I said, both sources have diffe

Re: Using HybridSource

2023-07-04 Thread Oscar Perez via user
ok, but is it? As I said, both sources have different data types. In the example here: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/ We are using both sources as returning string but in our case, one source would return a protobuf event while the ot

Re: Using HybridSource

2023-07-04 Thread Alexey Novakov via user
Hi Oscar, You could use connected streams and put your file into a special Kafka topic before starting such a job: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect But this may require more work and the event ordering (which is shuffled) in