Thanks a lot Matthias! Admittedly I have never used MultipleInputStraeamOperator. Will check it out!
On Thu, Dec 5, 2024, 08:47 Schwalbe Matthias <matthias.schwa...@viseca.ch> wrote: > Good morning Salva, > > > > The situation is much better than you apparently are aware of đ > > For quite some time there is an implementation for keyed operators with as > many inputs as you like: > > - MultipleInputStreamOperator/KeyedMultipleInputTransformation > > > > I originally used your proposed sum types with unions, that worked fine > but was far from elegant đ > > > > ⊠I posted a sketch of how to implement MultipleInputStreamOperator a > while ago ⊠found it (around August 08 2024): > > > > Here the essentials again: > > > > Hi Christian, > > As I didnât find a well-documented reference to MultipleInputStreamOperator, > let me sketch it (itâs Scala code, easy enough to be transformed to Java): > > - Implements 4 inputs , the second is a broadcast stream > - ACHEP, CEIEP, V1EP, AMAP are typedef of the respective input types > > > > > > > > //implement the operator > > class YourEnrichmentOperator(params: StreamOperatorParameters[O]) > > //specify number of inputs (4) > > extends AbstractStreamOperatorV2[O](params, 4) > > with MultipleInputStreamOperator[O] > > with Triggerable[K, VoidNamespace] > > { > > @transient var collector: TimestampedCollector[O] = null > > @transient var internalTimerService: > InternalTimerService[VoidNamespace] = null > > > > override def open(): Unit = { > > super.open() > > require(configuration != null, "Missing configuration") > > > > //setup collector and timer services (modelled after other operators) > > collector = new TimestampedCollector[O](output) > > internalTimerService = getInternalTimerService( > > "user-timers", > > VoidNamespaceSerializer.INSTANCE, > > this > > ) > > } > > > > //implement each input > > val input1achep = > > new AbstractInput[ACHEP, O](this, 1) { > > override def processElement(element: StreamRecord[ACHEP]): Unit = { > > val achep: ACHEP = element.getValue > > val et = element.getTimestamp > > val key = getCurrentKey.asInstanceOf[K] > > collector.setAbsoluteTimestamp(et) > > ... > > } > > } > > > > val input2ceiep = > > new AbstractInput[CEIEP, O](this, 2) { > > ... > > } > > > > val input3v1ep = > > new AbstractInput[V1EP, O](this, 3) { > > ... > > } > > > > val input4amap = > > new AbstractInput[AMAP, O](this, 4) { > > ... > > } > > > > //implement getInputs(...) with all configured inputs > > override def getInputs: util.List[Input[_]] = > > util.List.of[Input[_]](input1achep, input2ceiep, input3v1ep, > input4amap) > > } > > > > //implement the operator factory > > class YourEnrichmentOperatorFactory > > extends AbstractStreamOperatorFactory[O]{ > > override def createStreamOperator[T <: StreamOperator[O]](parameters: > StreamOperatorParameters[O]): T = { > > val operator = new YourEnrichmentOperator(parameters) > > operator.asInstanceOf[T] > > } > > > > override def getStreamOperatorClass(classLoader: ClassLoader): Class[_ > <: StreamOperator[_]] = { > > YourEnrichmentOperator.getClass.asInstanceOf[Class[StreamOperator[_]]] > > } > > } > > > > //implement job setup > > ... > > def setupJobFromInputs(achep: DataStream[ACHEP], > > ceiep: DataStream[CEIEP], > > v1ep: DataStream[V1EP], > > amap: DataStream[AMAP]) = { > > > > val par = your.parallelism > > val factory = new YourEnrichmentOperatorFactory > > val yourEnrichmentTransformation = > > new KeyedMultipleInputTransformation[O]( > > "yourEnrichment", > > factory, > > oTI, > > par, > > kTI > > ) > > > > val achepKS = new KeySelector[ACHEP, K] { > > override def getKey(value: ACHEP): K = value.yourId > > } > > yourEnrichmentTransformation.addInput( > > achep.javaStream > > .getTransformation(), > > achepKS > > ) > > > > yourEnrichmentTransformation.addInput( > > ceiep.broadcast.javaStream.getTransformation, > > null > > ) > > > > val v1epKS = new KeySelector[V1EP, K] { > > override def getKey(value: V1EP): K = value.cardHolderId > > } > > yourEnrichmentTransformation.addInput( > > v1ep > > .javaStream > > .getTransformation(), > > v1epKS > > ) > > > > val amapKS = new KeySelector[AMAP, K] { > > override def getKey(value: AMAP): K = value.yourId > > } > > yourEnrichmentTransformation.addInput( > > amap > > .javaStream > > .getTransformation(), > > amapKS > > ) > > > > val yourEnrichedStream = > > new DataStream[O]( > > new MultipleConnectedStreams(env.getJavaEnv) > > .transform(yourEnrichmentTransformation) > > ).withScopedMetaData("yourEnriched") > > yourEnrichedStream > > } > > ... > > > > I hope this helps. > > > > Flink-Greetings > > Thias > > > > > > > > > > *From:* Salva AlcĂĄntara <salcantara...@gmail.com> > *Sent:* Wednesday, December 4, 2024 2:03 PM > *To:* user <user@flink.apache.org> > *Subject:* [External] Joining Streams: "One operator with N inputs" vs > "N-1 co-processors" > > > > â *EXTERNAL MESSAGE â **CAUTION: Think Before You Click *â > > > > I have a job which basically joins different inputs together, all > partitioned by the same key. > > > > I originally took the typical approach and created a pipeline consisting > of N-1 successive joins, each one implemented using a DataStream co-process > function. > > > > To avoid shuffling and also some state duplication across operators, I am > now considering the following alternative design: > > - Collapse all the pipeline into a single (fat) operator > > - This operator will process all the inputs, effectively > > > > Since Flink does not support side inputs yet, they need to be simulated, > e.g., by unioning all the different inputs into a sum type (a tuple or a > POJO with one field for each type of input). > > > > Has anyone experimented with these two (somehow dual) approaches? If so, > could you provide some guidance/advice to decide which one to use? > > > > On a related note, are there any plans to move FLIP-17 > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17%3A+Side+Inputs+for+DataStream+API> > forward? > > > > Regards, > > > > Salva > Diese Nachricht ist ausschliesslich fĂŒr den Adressaten bestimmt und > beinhaltet unter UmstĂ€nden vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewĂ€hrleistet werden kann, > ĂŒbernehmen wir keine Haftung fĂŒr die GewĂ€hrung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtĂŒmlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller AnhĂ€nge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >