Good morning Salva, The situation is much better than you apparently are aware of đ For quite some time there is an implementation for keyed operators with as many inputs as you like:
* MultipleInputStreamOperator/KeyedMultipleInputTransformation I originally used your proposed sum types with unions, that worked fine but was far from elegant đ ⊠I posted a sketch of how to implement MultipleInputStreamOperator a while ago ⊠found it (around August 08 2024): Here the essentials again: Hi Christian, As I didnât find a well-documented reference to MultipleInputStreamOperator, let me sketch it (itâs Scala code, easy enough to be transformed to Java): * Implements 4 inputs , the second is a broadcast stream * ACHEP, CEIEP, V1EP, AMAP are typedef of the respective input types //implement the operator class YourEnrichmentOperator(params: StreamOperatorParameters[O]) //specify number of inputs (4) extends AbstractStreamOperatorV2[O](params, 4) with MultipleInputStreamOperator[O] with Triggerable[K, VoidNamespace] { @transient var collector: TimestampedCollector[O] = null @transient var internalTimerService: InternalTimerService[VoidNamespace] = null override def open(): Unit = { super.open() require(configuration != null, "Missing configuration") //setup collector and timer services (modelled after other operators) collector = new TimestampedCollector[O](output) internalTimerService = getInternalTimerService( "user-timers", VoidNamespaceSerializer.INSTANCE, this ) } //implement each input val input1achep = new AbstractInput[ACHEP, O](this, 1) { override def processElement(element: StreamRecord[ACHEP]): Unit = { val achep: ACHEP = element.getValue val et = element.getTimestamp val key = getCurrentKey.asInstanceOf[K] collector.setAbsoluteTimestamp(et) ... } } val input2ceiep = new AbstractInput[CEIEP, O](this, 2) { ... } val input3v1ep = new AbstractInput[V1EP, O](this, 3) { ... } val input4amap = new AbstractInput[AMAP, O](this, 4) { ... } //implement getInputs(...) with all configured inputs override def getInputs: util.List[Input[_]] = util.List.of[Input[_]](input1achep, input2ceiep, input3v1ep, input4amap) } //implement the operator factory class YourEnrichmentOperatorFactory extends AbstractStreamOperatorFactory[O]{ override def createStreamOperator[T <: StreamOperator[O]](parameters: StreamOperatorParameters[O]): T = { val operator = new YourEnrichmentOperator(parameters) operator.asInstanceOf[T] } override def getStreamOperatorClass(classLoader: ClassLoader): Class[_ <: StreamOperator[_]] = { YourEnrichmentOperator.getClass.asInstanceOf[Class[StreamOperator[_]]] } } //implement job setup ... def setupJobFromInputs(achep: DataStream[ACHEP], ceiep: DataStream[CEIEP], v1ep: DataStream[V1EP], amap: DataStream[AMAP]) = { val par = your.parallelism val factory = new YourEnrichmentOperatorFactory val yourEnrichmentTransformation = new KeyedMultipleInputTransformation[O]( "yourEnrichment", factory, oTI, par, kTI ) val achepKS = new KeySelector[ACHEP, K] { override def getKey(value: ACHEP): K = value.yourId } yourEnrichmentTransformation.addInput( achep.javaStream .getTransformation(), achepKS ) yourEnrichmentTransformation.addInput( ceiep.broadcast.javaStream.getTransformation, null ) val v1epKS = new KeySelector[V1EP, K] { override def getKey(value: V1EP): K = value.cardHolderId } yourEnrichmentTransformation.addInput( v1ep .javaStream .getTransformation(), v1epKS ) val amapKS = new KeySelector[AMAP, K] { override def getKey(value: AMAP): K = value.yourId } yourEnrichmentTransformation.addInput( amap .javaStream .getTransformation(), amapKS ) val yourEnrichedStream = new DataStream[O]( new MultipleConnectedStreams(env.getJavaEnv) .transform(yourEnrichmentTransformation) ).withScopedMetaData("yourEnriched") yourEnrichedStream } ... I hope this helps. Flink-Greetings Thias From: Salva AlcĂĄntara <salcantara...@gmail.com> Sent: Wednesday, December 4, 2024 2:03 PM To: user <user@flink.apache.org> Subject: [External] Joining Streams: "One operator with N inputs" vs "N-1 co-processors" â EXTERNAL MESSAGE â CAUTION: Think Before You Click â I have a job which basically joins different inputs together, all partitioned by the same key. I originally took the typical approach and created a pipeline consisting of N-1 successive joins, each one implemented using a DataStream co-process function. To avoid shuffling and also some state duplication across operators, I am now considering the following alternative design: - Collapse all the pipeline into a single (fat) operator - This operator will process all the inputs, effectively Since Flink does not support side inputs yet, they need to be simulated, e.g., by unioning all the different inputs into a sum type (a tuple or a POJO with one field for each type of input). Has anyone experimented with these two (somehow dual) approaches? If so, could you provide some guidance/advice to decide which one to use? On a related note, are there any plans to move FLIP-17<https://cwiki.apache.org/confluence/display/FLINK/FLIP-17%3A+Side+Inputs+for+DataStream+API> forward? Regards, Salva Diese Nachricht ist ausschliesslich fĂŒr den Adressaten bestimmt und beinhaltet unter UmstĂ€nden vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewĂ€hrleistet werden kann, ĂŒbernehmen wir keine Haftung fĂŒr die GewĂ€hrung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtĂŒmlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller AnhĂ€nge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.