Thanks a lot Matthias! Admittedly I have never used
MultipleInputStraeamOperator. Will check it out!

On Thu, Dec 5, 2024, 08:47 Schwalbe Matthias <matthias.schwa...@viseca.ch>
wrote:

> Good morning Salva,
>
>
>
> The situation is much better than you apparently are aware of 😊
>
> For quite some time there is an implementation for keyed operators with as
> many inputs as you like:
>
>    - MultipleInputStreamOperator/KeyedMultipleInputTransformation
>
>
>
> I originally used your proposed sum types with unions, that worked fine
> but was far from elegant 😊
>
>
>
> 
 I posted a sketch of how to implement MultipleInputStreamOperator a
> while ago 
 found it (around August 08 2024):
>
>
>
> Here the essentials again:
>
>
>
> Hi Christian,
>
> As I didn’t find a well-documented reference to MultipleInputStreamOperator,
> let me sketch it (it’s Scala code, easy enough to be transformed to Java):
>
>    - Implements 4 inputs , the second is a broadcast stream
>    - ACHEP, CEIEP, V1EP, AMAP are typedef of the respective input types
>
>
>
>
>
>
>
> //implement the operator
>
> class YourEnrichmentOperator(params: StreamOperatorParameters[O])
>
>   //specify number of inputs (4)
>
>   extends AbstractStreamOperatorV2[O](params, 4)
>
>     with MultipleInputStreamOperator[O]
>
>     with Triggerable[K, VoidNamespace]
>
> {
>
>   @transient var collector: TimestampedCollector[O] = null
>
>   @transient var internalTimerService:
> InternalTimerService[VoidNamespace] = null
>
>
>
>   override def open(): Unit = {
>
>     super.open()
>
>     require(configuration != null, "Missing configuration")
>
>
>
>     //setup collector and timer services (modelled after other operators)
>
>     collector = new TimestampedCollector[O](output)
>
>     internalTimerService = getInternalTimerService(
>
>       "user-timers",
>
>       VoidNamespaceSerializer.INSTANCE,
>
>       this
>
>     )
>
>   }
>
>
>
>  //implement each input
>
>   val input1achep =
>
>     new AbstractInput[ACHEP, O](this, 1) {
>
>       override def processElement(element: StreamRecord[ACHEP]): Unit = {
>
>         val achep: ACHEP = element.getValue
>
>         val et = element.getTimestamp
>
>         val key = getCurrentKey.asInstanceOf[K]
>
>         collector.setAbsoluteTimestamp(et)
>
>         ...
>
>       }
>
>     }
>
>
>
>   val input2ceiep =
>
>     new AbstractInput[CEIEP, O](this, 2) {
>
>     ...
>
>   }
>
>
>
>   val input3v1ep =
>
>     new AbstractInput[V1EP, O](this, 3) {
>
>       ...
>
>     }
>
>
>
>   val input4amap =
>
>     new AbstractInput[AMAP, O](this, 4) {
>
>       ...
>
>     }
>
>
>
>   //implement getInputs(...) with all configured inputs
>
>   override def getInputs: util.List[Input[_]] =
>
>     util.List.of[Input[_]](input1achep, input2ceiep, input3v1ep,
> input4amap)
>
> }
>
>
>
> //implement the operator factory
>
> class YourEnrichmentOperatorFactory
>
>   extends AbstractStreamOperatorFactory[O]{
>
>   override def createStreamOperator[T <: StreamOperator[O]](parameters:
> StreamOperatorParameters[O]): T = {
>
>     val operator = new YourEnrichmentOperator(parameters)
>
>     operator.asInstanceOf[T]
>
>   }
>
>
>
>   override def getStreamOperatorClass(classLoader: ClassLoader): Class[_
> <: StreamOperator[_]] = {
>
>     YourEnrichmentOperator.getClass.asInstanceOf[Class[StreamOperator[_]]]
>
>   }
>
> }
>
>
>
> //implement job setup
>
> ...
>
>   def setupJobFromInputs(achep: DataStream[ACHEP],
>
>             ceiep: DataStream[CEIEP],
>
>             v1ep: DataStream[V1EP],
>
>             amap: DataStream[AMAP]) = {
>
>
>
>     val par = your.parallelism
>
>     val factory = new YourEnrichmentOperatorFactory
>
>     val yourEnrichmentTransformation =
>
>       new KeyedMultipleInputTransformation[O](
>
>         "yourEnrichment",
>
>         factory,
>
>         oTI,
>
>         par,
>
>         kTI
>
>       )
>
>
>
>     val achepKS = new KeySelector[ACHEP, K] {
>
>       override def getKey(value: ACHEP): K = value.yourId
>
>     }
>
>     yourEnrichmentTransformation.addInput(
>
>       achep.javaStream
>
>         .getTransformation(),
>
>       achepKS
>
>     )
>
>
>
>     yourEnrichmentTransformation.addInput(
>
>       ceiep.broadcast.javaStream.getTransformation,
>
>       null
>
>     )
>
>
>
>     val v1epKS = new KeySelector[V1EP, K] {
>
>       override def getKey(value: V1EP): K = value.cardHolderId
>
>     }
>
>     yourEnrichmentTransformation.addInput(
>
>       v1ep
>
>         .javaStream
>
>         .getTransformation(),
>
>       v1epKS
>
>     )
>
>
>
>     val amapKS = new KeySelector[AMAP, K] {
>
>      override def getKey(value: AMAP): K = value.yourId
>
>     }
>
>     yourEnrichmentTransformation.addInput(
>
>       amap
>
>         .javaStream
>
>         .getTransformation(),
>
>       amapKS
>
>     )
>
>
>
>     val yourEnrichedStream =
>
>       new DataStream[O](
>
>         new MultipleConnectedStreams(env.getJavaEnv)
>
>           .transform(yourEnrichmentTransformation)
>
>       ).withScopedMetaData("yourEnriched")
>
>     yourEnrichedStream
>
>   }
>
> ...
>
>
>
> I hope this helps.
>
>
>
> Flink-Greetings
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Salva AlcĂĄntara <salcantara...@gmail.com>
> *Sent:* Wednesday, December 4, 2024 2:03 PM
> *To:* user <user@flink.apache.org>
> *Subject:* [External] Joining Streams: "One operator with N inputs" vs
> "N-1 co-processors"
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> I have a job which basically joins different inputs together, all
> partitioned by the same key.
>
>
>
> I originally took the typical approach and created a pipeline consisting
> of N-1 successive joins, each one implemented using a DataStream co-process
> function.
>
>
>
> To avoid shuffling and also some state duplication across operators, I am
> now considering the following alternative design:
>
> - Collapse all the pipeline into a single (fat) operator
>
> - This operator will process all the inputs, effectively
>
>
>
> Since Flink does not support side inputs yet, they need to be simulated,
> e.g., by unioning all the different inputs into a sum type (a tuple or a
> POJO with one field for each type of input).
>
>
>
> Has anyone experimented with these two (somehow dual) approaches? If so,
> could you provide some guidance/advice to decide which one to use?
>
>
>
> On a related note, are there any plans to move FLIP-17
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17%3A+Side+Inputs+for+DataStream+API>
> forward?
>
>
>
> Regards,
>
>
>
> Salva
> Diese Nachricht ist ausschliesslich fĂŒr den Adressaten bestimmt und
> beinhaltet unter UmstÀnden vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewÀhrleistet werden kann,
> ĂŒbernehmen wir keine Haftung fĂŒr die GewĂ€hrung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtĂŒmlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller AnhÀnge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to