Good morning Salva, The situation is much better than you apparently are aware of đ For quite some time there is an implementation for keyed operators with as many inputs as you like:
* MultipleInputStreamOperator/KeyedMultipleInputTransformation I originally used your proposed sum types with unions, that worked fine but was far from elegant đ ⊠I posted a sketch of how to implement MultipleInputStreamOperator a while ago ⊠found it (around August 08 2024): Here the essentials again: Hi Christian, As I didnât find a well-documented reference to MultipleInputStreamOperator, let me sketch it (itâs Scala code, easy enough to be transformed to Java): * Implements 4 inputs , the second is a broadcast stream * ACHEP, CEIEP, V1EP, AMAP are typedef of the respective input types //implement the operator class YourEnrichmentOperator(params: StreamOperatorParameters[O]) //specify number of inputs (4) extends AbstractStreamOperatorV2[O](params, 4) with MultipleInputStreamOperator[O] with Triggerable[K, VoidNamespace] { @transient var collector: TimestampedCollector[O] = null @transient var internalTimerService: InternalTimerService[VoidNamespace] = null override def open(): Unit = { require(configuration != null, "Missing configuration") //setup collector and timer services (modelled after other operators) collector = new TimestampedCollector[O](output) internalTimerService = getInternalTimerService( "user-timers", VoidNamespaceSerializer.INSTANCE, this ) } //implement each input val input1achep = new AbstractInput[ACHEP, O](this, 1) { override def processElement(element: StreamRecord[ACHEP]): Unit = { val achep: ACHEP = element.getValue val et = element.getTimestamp val key = getCurrentKey.asInstanceOf[K] collector.setAbsoluteTimestamp(et) ... } } val input2ceiep = new AbstractInput[CEIEP, O](this, 2) { ... } val input3v1ep = new AbstractInput[V1EP, O](this, 3) { ... } val input4amap = new AbstractInput[AMAP, O](this, 4) { ... } //implement getInputs(...) with all configured inputs override def getInputs: util.List[Input[_]] = util.List.of[Input[_]](input1achep, input2ceiep, input3v1ep, input4amap) } //implement the operator factory class YourEnrichmentOperatorFactory extends AbstractStreamOperatorFactory[O]{ override def createStreamOperator[T <: StreamOperator[O]](parameters: StreamOperatorParameters[O]): T = { val operator = new YourEnrichmentOperator(parameters) operator.asInstanceOf[T] } override def getStreamOperatorClass(classLoader: ClassLoader): Class[_ <: StreamOperator[_]] = { YourEnrichmentOperator.getClass.asInstanceOf[Class[StreamOperator[_]]] } } //implement job setup ... def setupJobFromInputs(achep: DataStream[ACHEP], ceiep: DataStream[CEIEP], v1ep: DataStream[V1EP], amap: DataStream[AMAP]) = { val par = your.parallelism val factory = new YourEnrichmentOperatorFactory val yourEnrichmentTransformation = new KeyedMultipleInputTransformation[O]( "yourEnrichment", factory, oTI, par, kTI ) val achepKS = new KeySelector[ACHEP, K] { override def getKey(value: ACHEP): K = value.yourId } yourEnrichmentTransformation.addInput( achep.javaStream .getTransformation(), achepKS ) yourEnrichmentTransformation.addInput( ceiep.broadcast.javaStream.getTransformation, null ) val v1epKS = new KeySelector[V1EP, K] { override def getKey(value: V1EP): K = value.cardHolderId } yourEnrichmentTransformation.addInput( v1ep .javaStream .getTransformation(), v1epKS ) val amapKS = new KeySelector[AMAP, K] { override def getKey(value: AMAP): K = value.yourId } yourEnrichmentTransformation.addInput( amap .javaStream .getTransformation(), amapKS ) val yourEnrichedStream = new DataStream[O]( new MultipleConnectedStreams(env.getJavaEnv) .transform(yourEnrichmentTransformation) ).withScopedMetaData("yourEnriched") yourEnrichedStream } ... I hope this helps. Flink-Greetings Thias From: Salva AlcĂĄntara <> Sent: Wednesday, December 4, 2024 2:03 PM To: user <> Subject: [External] Joining Streams: "One operator with N inputs" vs "N-1 co-processors" â EXTERNAL MESSAGE â CAUTION: Think Before You Click â I have a job which basically joins different inputs together, all partitioned by the same key. I originally took the typical approach and created a pipeline consisting of N-1 successive joins, each one implemented using a DataStream co-process function. To avoid shuffling and also some state duplication across operators, I am now considering the following alternative design: - Collapse all the pipeline into a single (fat) operator - This operator will process all the inputs, effectively Since Flink does not support side inputs yet, they need to be simulated, e.g., by unioning all the different inputs into a sum type (a tuple or a POJO with one field for each type of input). 