Hm. I wonder if you could implement a custom Deserializer that wraps both
the CSV and Protobuf deserializer, and conditionally chooses which one to
use. As long as the final TypeInformation returned by the Source is the
same in either case, I think it should work?

> Kafka comes from protobuf while the CSV is a POJO though both have the
same fields
This could be the hard part, I think no matter what you do you'll have to
make the TypeInformation the HybridSource uses in either case be the exact
same.  Maybe you could convert your Protobuf to the POJO, or vice versa?




On Wed, Jul 5, 2023 at 10:19 AM Oscar Perez via user <user@flink.apache.org>
wrote:

> and this is our case Alexander, it is the same data schema but different
> data format. Kafka comes from protobuf while the CSV is a POJO though both
> have the same fields. IMHO, the design of HybridSource is very limited and
> you have to do nasty workarounds if you want to combine from cold storage
> (CSV, S3) and kafka if the expectations differ a bit from the most common
> use case (e.g. using protobuf)
>
> Regards,
> Oscar
>
> On Wed, 5 Jul 2023 at 12:53, Alexander Fedulov <
> alexander.fedu...@gmail.com> wrote:
>
>> I do not think that trying to "squash" two different data types into one
>> just to use HybridSource is the right thing to do here. HybridSource is
>> primarily intended for use cases that need to read the same data from
>> different sources. A typical example: read events from "cold storage" in S3
>> up to a specific point and switch over to "live" data in Kafka.
>> Since you are already using the low-level API, you can either
>> manually pull the data in inside of the open() function, or stream it into
>> the local state using KeyedCoProcessFunction or
>> KeyedBroadcastProcessFunction (depending on the size of the lookup state).
>>
>> This video should get you covered:
>> https://www.youtube.com/watch?v=cJS18iKLUIY
>>
>> Best,
>> Alex
>>
>>
>> On Wed, 5 Jul 2023 at 07:29, Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> Was it a conscious decision that HybridSource only accept Sources, and
>>> does not allow mapping functions applied to them before combining them?
>>>
>>> On Tue, Jul 4, 2023, 23:53 Ken Krugler <kkrugler_li...@transpac.com>
>>> wrote:
>>>
>>>> Hi Oscar,
>>>>
>>>> Couldn’t you have both the Kafka and File sources return an Either<POJO
>>>> from CSV File, Protobuf from Kafka>, and then (after the HybridSource) use
>>>> a MapFunction to convert to the unified/correct type?
>>>>
>>>> — Ken
>>>>
>>>>
>>>> On Jul 4, 2023, at 12:13 PM, Oscar Perez via user <
>>>> user@flink.apache.org> wrote:
>>>>
>>>> Hei,
>>>> 1) We populate state based on this CSV data and do business logic based
>>>> on this state and events coming from other unrelated streams.
>>>> 2) We are using low level process function in order to process this
>>>> future hybrid source
>>>>
>>>> Regardless of the aforementioned points, please note that the main
>>>> challenge is to combine in a hybridsource CSV and kafka topic that return
>>>> different datatypes so I dont know how my answers relate to the original
>>>> problem tbh. Regards,
>>>> Oscar
>>>>
>>>> On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov <
>>>> alexander.fedu...@gmail.com> wrote:
>>>>
>>>>> @Oscar
>>>>> 1. How do you plan to use that CSV data? Is it needed for lookup from
>>>>> the "main" stream?
>>>>> 2. Which API are you using? DataStream/SQL/Table or low level
>>>>> ProcessFunction?
>>>>>
>>>>> Best,
>>>>> Alex
>>>>>
>>>>>
>>>>> On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user <
>>>>> user@flink.apache.org> wrote:
>>>>>
>>>>>> ok, but is it? As I said, both sources have different data types. In
>>>>>> the example here:
>>>>>>
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>>>>>
>>>>>> We are using both sources as returning string but in our case, one
>>>>>> source would return a protobuf event while the other would return a pojo.
>>>>>> How can we make the 2 sources share the same datatype so that we can
>>>>>> successfully use hybrid source?
>>>>>>
>>>>>> Regards,
>>>>>> Oscar
>>>>>>
>>>>>> On Tue, 4 Jul 2023 at 12:04, Alexey Novakov <ale...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Oscar,
>>>>>>>
>>>>>>> You could use connected streams and put your file into a special
>>>>>>> Kafka topic before starting such a job:
>>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
>>>>>>> But this may require more work and the event ordering (which is
>>>>>>> shuffled) in the connected streams is probably not what you are looking 
>>>>>>> for.
>>>>>>>
>>>>>>> I think HybridSource is the right solution.
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Alexey
>>>>>>>
>>>>>>> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user <
>>>>>>> user@flink.apache.org> wrote:
>>>>>>>
>>>>>>>> Hei, We want to bootstrap some data from a CSV file before reading
>>>>>>>> from a kafka topic that has a retention period of 7 days.
>>>>>>>>
>>>>>>>> We believe the best tool for that would be the HybridSource but the
>>>>>>>> problem we are facing is that both datasources are of different 
>>>>>>>> nature. The
>>>>>>>> KafkaSource returns a protobuf event while the CSV is a POJO with just 
>>>>>>>> 3
>>>>>>>> fields.
>>>>>>>>
>>>>>>>> We could hack the kafkasource implementation and then in the
>>>>>>>> valuedeserializer do the mapping from protobuf to the CSV POJO but that
>>>>>>>> seems rather hackish. Is there a way more elegant to unify both 
>>>>>>>> datatypes
>>>>>>>> from both sources using Hybrid Source?
>>>>>>>>
>>>>>>>> thanks
>>>>>>>> Oscar
>>>>>>>>
>>>>>>>
>>>> --------------------------
>>>> Ken Krugler
>>>> http://www.scaleunlimited.com
>>>> Custom big data solutions
>>>> Flink, Pinot, Solr, Elasticsearch
>>>>
>>>>
>>>>
>>>>

Reply via email to