Thanks Alexander! That is different from what I initially had in mind, but interesting anyway so I will take a closer look at it.
What I initially had in mind is a construct for simply: - appending a conversion functions f:A->B to a Source<A> in order to get a Source<B> (corresponds to "map") - prepending a conversion function f:B->A to a Sink<A> in order to get a Sink<B> (corresponds to "contramap") In Scala, this is commonly done/offered by JSON libraries such as Circe: https://circe.github.io/circe/codecs/custom-codecs.html#custom-encodersdecoders Again, this might not apply well to the case at hand (Flink sources & sinks) but looks reasonable to me. I will consider this problem further and share my thoughts... Salva On Mon, Jul 18, 2022 at 9:25 PM Alexander Fedulov <alexan...@ververica.com> wrote: > I had to do something similar recently for FLIP-238 (generator source) > [1]. The PoC [2] reuses the NumberSequenceSource to produce other data > types based on a user-defined mapper. The mapping happens within the > SourceReader. Here are some relevant classes [3], [4]. Not sure if this is > the best approach for your case, but it could hint at one possible > direction. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source > [2] https://github.com/afedulov/flink/tree/FLINK-27919-generator-source > [3] > https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java > [4] > https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java#L135 > > Best, > Alexander Fedulov > > On Mon, Jul 18, 2022 at 8:01 PM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> Yep, that is mostly it. I have (DataStream) connector (sources & sink) >> which works for a fixed type (`JsonNode` for what it's worth) as you say >> and I want to reuse it for Table/SQL, which requires working with `DataRow` >> as the underlying data type. But even beyond that specific use case, I >> think being able of genering sinks/sources out of existing ones makes >> sense. Essentially, what I'd like is to make sources & sinks functorial by >> attaching a `map` method, but maybe I should not even need this in the >> first place and there are more idiomatic/correct ways of approaching this >> within Flink. >> >> On Mon, Jul 18, 2022 at 5:33 PM Alexander Fedulov < >> alexan...@ververica.com> wrote: >> >>> Hi Salva, >>> >>> what is the goal? Do you have some source that already has a fixed type >>> and you want to reuse its functionality for producing a different data type? >>> >>> Best, >>> Alexander Fedulov >>> >>> >>> On Mon, Jul 18, 2022 at 1:29 PM Salva Alcántara <salcantara...@gmail.com> >>> wrote: >>> >>>> If I have a Source<A> (Sink<A>), what would be the simplest way of >>>> obtaining a Source<B> (Sink<B>) based on a mapping/conversion function from >>>> A to B. AFAIK sources & sinks don't have map so I was just wondering how to >>>> approach this in the context of new sources/sinks apis. >>>> >>>> Regards, >>>> >>>> Salva >>>> >>>