Thanks Alexander! That is different from what I initially had in mind, but
interesting anyway so I will take a closer look at it.

What I initially had in mind is a construct for simply:

- appending a conversion functions f:A->B to a Source<A> in order to get a
Source<B> (corresponds to "map")
- prepending a conversion function f:B->A to a Sink<A> in order to get a
Sink<B> (corresponds to "contramap")

In Scala, this is commonly done/offered by JSON libraries such as Circe:
https://circe.github.io/circe/codecs/custom-codecs.html#custom-encodersdecoders

Again, this might not apply well to the case at hand (Flink sources &
sinks) but looks reasonable to me.

I will consider this problem further and share my thoughts...

Salva

On Mon, Jul 18, 2022 at 9:25 PM Alexander Fedulov <alexan...@ververica.com>
wrote:

> I had to do something similar recently for FLIP-238 (generator source)
> [1]. The PoC [2] reuses the NumberSequenceSource to produce other data
> types based on a user-defined mapper. The mapping happens within the
> SourceReader. Here are some relevant classes [3], [4]. Not sure if this is
> the best approach for your case, but it could hint at one possible
> direction.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source
> [2] https://github.com/afedulov/flink/tree/FLINK-27919-generator-source
> [3]
> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java
> [4]
> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java#L135
>
> Best,
> Alexander Fedulov
>
> On Mon, Jul 18, 2022 at 8:01 PM Salva Alcántara <salcantara...@gmail.com>
> wrote:
>
>> Yep, that is mostly it. I have (DataStream) connector (sources & sink)
>> which works for a fixed type (`JsonNode` for what it's worth) as you say
>> and I want to reuse it for Table/SQL, which requires working with `DataRow`
>> as the underlying data type. But even beyond that specific use case, I
>> think being able of genering sinks/sources out of existing ones makes
>> sense. Essentially, what I'd like is to make sources & sinks functorial by
>> attaching a `map` method, but maybe I should not even need this in the
>> first place and there are more idiomatic/correct ways of approaching this
>> within Flink.
>>
>> On Mon, Jul 18, 2022 at 5:33 PM Alexander Fedulov <
>> alexan...@ververica.com> wrote:
>>
>>> Hi Salva,
>>>
>>> what is the goal? Do you have some source that already has a fixed type
>>> and you want to reuse its functionality for producing a different data type?
>>>
>>> Best,
>>> Alexander Fedulov
>>>
>>>
>>> On Mon, Jul 18, 2022 at 1:29 PM Salva Alcántara <salcantara...@gmail.com>
>>> wrote:
>>>
>>>> If I have a Source<A> (Sink<A>), what would be the simplest way of
>>>> obtaining a Source<B> (Sink<B>) based on a mapping/conversion function from
>>>> A to B. AFAIK sources & sinks don't have map so I was just wondering how to
>>>> approach this in the context of new sources/sinks apis.
>>>>
>>>> Regards,
>>>>
>>>> Salva
>>>>
>>>

Reply via email to