Hi Oscar,

As Andrew noted, you could use the approach of overriding the CsvReaderFormat 
(or whatever you’re using to extract CSV data from S3) to return a Protobuf 
that’s the same format as what you’re getting from Kafka, though that obviously 
introduces some inefficiencies. Or you could do the opposite, and extend the 
Kafka deserialization schema to return a POJO versus a Protobuf.

But an important point with sources is that it’s harder to handle errors here, 
versus downstream in the workflow. E.g. if you have some error with converting 
to Protobuf, or mapping the Protobuf to your POJO, then you don’t have the 
ability to use a side output (as you normally would with a ProcessFunction). So 
that’s one reason why the HybridSource doesn’t provide out-of-the-box 
functionality to do extra mapping.

— Ken

> On Jul 5, 2023, at 7:19 AM, Oscar Perez <oscarfernando.pe...@n26.com> wrote:
> 
> and this is our case Alexander, it is the same data schema but different data 
> format. Kafka comes from protobuf while the CSV is a POJO though both have 
> the same fields. IMHO, the design of HybridSource is very limited and you 
> have to do nasty workarounds if you want to combine from cold storage (CSV, 
> S3) and kafka if the expectations differ a bit from the most common use case 
> (e.g. using protobuf)
> 
> Regards,
> Oscar
> 
> On Wed, 5 Jul 2023 at 12:53, Alexander Fedulov <alexander.fedu...@gmail.com 
> <mailto:alexander.fedu...@gmail.com>> wrote:
> I do not think that trying to "squash" two different data types into one just 
> to use HybridSource is the right thing to do here. HybridSource is primarily 
> intended for use cases that need to read the same data from different 
> sources. A typical example: read events from "cold storage" in S3 up to a 
> specific point and switch over to "live" data in Kafka. 
> Since you are already using the low-level API, you can either manually pull 
> the data in inside of the open() function, or stream it into the local state 
> using KeyedCoProcessFunction or KeyedBroadcastProcessFunction (depending on 
> the size of the lookup state). 
> 
> This video should get you covered:
> https://www.youtube.com/watch?v=cJS18iKLUIY 
> <https://www.youtube.com/watch?v=cJS18iKLUIY>
> 
> Best,
> Alex
> 
> 
> On Wed, 5 Jul 2023 at 07:29, Péter Váry <peter.vary.apa...@gmail.com 
> <mailto:peter.vary.apa...@gmail.com>> wrote:
> Was it a conscious decision that HybridSource only accept Sources, and does 
> not allow mapping functions applied to them before combining them?
> 
> On Tue, Jul 4, 2023, 23:53 Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Oscar,
> 
> Couldn’t you have both the Kafka and File sources return an Either<POJO from 
> CSV File, Protobuf from Kafka>, and then (after the HybridSource) use a 
> MapFunction to convert to the unified/correct type?
> 
> — Ken
> 
> 
>> On Jul 4, 2023, at 12:13 PM, Oscar Perez via user <user@flink.apache.org 
>> <mailto:user@flink.apache.org>> wrote:
>> 
>> Hei,
>> 1) We populate state based on this CSV data and do business logic based on 
>> this state and events coming from other unrelated streams.
>> 2) We are using low level process function in order to process this future 
>> hybrid source
>> 
>> Regardless of the aforementioned points, please note that the main challenge 
>> is to combine in a hybridsource CSV and kafka topic that return different 
>> datatypes so I dont know how my answers relate to the original problem tbh. 
>> Regards,
>> Oscar
>> 
>> On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov <alexander.fedu...@gmail.com 
>> <mailto:alexander.fedu...@gmail.com>> wrote:
>> @Oscar <>
>> 1. How do you plan to use that CSV data? Is it needed for lookup from the 
>> "main" stream?
>> 2. Which API are you using? DataStream/SQL/Table or low level 
>> ProcessFunction?
>> 
>> Best,
>> Alex
>> 
>>  <>
>> On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user <user@flink.apache.org 
>> <mailto:user@flink.apache.org>> wrote:
>> ok, but is it? As I said, both sources have different data types. In the 
>> example here:
>> 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>  
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/>
>> 
>> We are using both sources as returning string but in our case, one source 
>> would return a protobuf event while the other would return a pojo. How can 
>> we make the 2 sources share the same datatype so that we can successfully 
>> use hybrid source?
>> 
>> Regards,
>> Oscar
>> 
>> On Tue, 4 Jul 2023 at 12:04, Alexey Novakov <ale...@ververica.com 
>> <mailto:ale...@ververica.com>> wrote:
>> Hi Oscar,
>> 
>> You could use connected streams and put your file into a special Kafka topic 
>> before starting such a job: 
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
>>  
>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect>
>> But this may require more work and the event ordering (which is shuffled) in 
>> the connected streams is probably not what you are looking for.
>> 
>> I think HybridSource is the right solution.
>> 
>> Best regards,
>> Alexey
>> 
>> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user <user@flink.apache.org 
>> <mailto:user@flink.apache.org>> wrote:
>> Hei, We want to bootstrap some data from a CSV file before reading from a 
>> kafka topic that has a retention period of 7 days.
>> 
>> We believe the best tool for that would be the HybridSource but the problem 
>> we are facing is that both datasources are of different nature. The 
>> KafkaSource returns a protobuf event while the CSV is a POJO with just 3 
>> fields.
>> 
>> We could hack the kafkasource implementation and then in the 
>> valuedeserializer do the mapping from protobuf to the CSV POJO but that 
>> seems rather hackish. Is there a way more elegant to unify both datatypes 
>> from both sources using Hybrid Source?
>> 
>> thanks
>> Oscar
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to