Thanks a lot Matthias! Admittedly I have never used
MultipleInputStraeamOperator. Will check it out!

> Good morning Salva,
> The situation is much better than you apparently are aware of 😊
> For quite some time there is an implementation for keyed operators with as
> many inputs as you like:
>    - MultipleInputStreamOperator/KeyedMultipleInputTransformation
> I originally used your proposed sum types with unions, that worked fine
> but was far from elegant 😊
 I posted a sketch of how to implement MultipleInputStreamOperator a
> while ago 
 found it (around August 08 2024):
> Here the essentials again:
> Hi Christian,
> As I didn’t find a well-documented reference to MultipleInputStreamOperator,
> let me sketch it (it’s Scala code, easy enough to be transformed to Java):
>    - Implements 4 inputs , the second is a broadcast stream
>    - ACHEP, CEIEP, V1EP, AMAP are typedef of the respective input types
> //implement the operator
> class YourEnrichmentOperator(params: StreamOperatorParameters[O])
>   //specify number of inputs (4)
>   extends AbstractStreamOperatorV2[O](params, 4)
>     with MultipleInputStreamOperator[O]
>     with Triggerable[K, VoidNamespace]
> {
>   @transient var collector: TimestampedCollector[O] = null
>   @transient var internalTimerService:
> InternalTimerService[VoidNamespace] = null
>   override def open(): Unit = {
>     require(configuration != null, "Missing configuration")
>     //setup collector and timer services (modelled after other operators)
>     collector = new TimestampedCollector[O](output)
>     internalTimerService = getInternalTimerService(
>       "user-timers",
>       VoidNamespaceSerializer.INSTANCE,
>       this
>     )
>   }
>  //implement each input
>   val input1achep =
>     new AbstractInput[ACHEP, O](this, 1) {
>       override def processElement(element: StreamRecord[ACHEP]): Unit = {
>         val achep: ACHEP = element.getValue
>         val et = element.getTimestamp
>         val key = getCurrentKey.asInstanceOf[K]
>         collector.setAbsoluteTimestamp(et)
>         ...
>       }
>     }
>   val input2ceiep =
>     new AbstractInput[CEIEP, O](this, 2) {
>     ...
>   }
>   val input3v1ep =
>     new AbstractInput[V1EP, O](this, 3) {
>       ...
>     }
>   val input4amap =
>     new AbstractInput[AMAP, O](this, 4) {
>       ...
>     }
>   //implement getInputs(...) with all configured inputs
>   override def getInputs: util.List[Input[_]] =
>     util.List.of[Input[_]](input1achep, input2ceiep, input3v1ep,
> input4amap)
> }
> //implement the operator factory
> class YourEnrichmentOperatorFactory
>   extends AbstractStreamOperatorFactory[O]{
>   override def createStreamOperator[T <: StreamOperator[O]](parameters:
> StreamOperatorParameters[O]): T = {
>     val operator = new YourEnrichmentOperator(parameters)
>     operator.asInstanceOf[T]
>   }
>   override def getStreamOperatorClass(classLoader: ClassLoader): Class[_
> <: StreamOperator[_]] = {
>     YourEnrichmentOperator.getClass.asInstanceOf[Class[StreamOperator[_]]]
>   }
> }
> //implement job setup
> ...
>   def setupJobFromInputs(achep: DataStream[ACHEP],
>             ceiep: DataStream[CEIEP],
>             v1ep: DataStream[V1EP],
>             amap: DataStream[AMAP]) = {
>     val par = your.parallelism
>     val factory = new YourEnrichmentOperatorFactory
>     val yourEnrichmentTransformation =
>       new KeyedMultipleInputTransformation[O](
>         "yourEnrichment",
>         factory,
>         oTI,
>         par,
>         kTI
>       )
>     val achepKS = new KeySelector[ACHEP, K] {
>       override def getKey(value: ACHEP): K = value.yourId
>     }
>     yourEnrichmentTransformation.addInput(
>       achep.javaStream
>         .getTransformation(),
>       achepKS
>     )
>     yourEnrichmentTransformation.addInput(
>       ceiep.broadcast.javaStream.getTransformation,
>       null
>     )
>     val v1epKS = new KeySelector[V1EP, K] {
>       override def getKey(value: V1EP): K = value.cardHolderId
>     }
>     yourEnrichmentTransformation.addInput(
>       v1ep
>         .javaStream
>         .getTransformation(),
>       v1epKS
>     )
>     val amapKS = new KeySelector[AMAP, K] {
>      override def getKey(value: AMAP): K = value.yourId
>     }
>     yourEnrichmentTransformation.addInput(
>       amap
>         .javaStream
>         .getTransformation(),
>       amapKS
>     )
>     val yourEnrichedStream =
>       new DataStream[O](
>         new MultipleConnectedStreams(env.getJavaEnv)
>           .transform(yourEnrichmentTransformation)
>       ).withScopedMetaData("yourEnriched")
>     yourEnrichedStream
>   }
> ...
> I hope this helps.
> Flink-Greetings
> Thias
> I have a job which basically joins different inputs together, all
> partitioned by the same key.
> I originally took the typical approach and created a pipeline consisting
> of N-1 successive joins, each one implemented using a DataStream co-process
> function.
> To avoid shuffling and also some state duplication across operators, I am
> now considering the following alternative design:
> - Collapse all the pipeline into a single (fat) operator
> - This operator will process all the inputs, effectively
> Since Flink does not support side inputs yet, they need to be simulated,
> e.g., by unioning all the different inputs into a sum type (a tuple or a
> POJO with one field for each type of input).
> Has anyone experimented with these two (somehow dual) approaches? If so,
> could you provide some guidance/advice to decide which one to use?
> On a related note, are there any plans to move FLIP-17
> <>
> forward?
> Regards,
> Salva
