Good morning Salva,

The situation is much better than you apparently are aware of 😊
For quite some time there is an implementation for keyed operators with as many 
inputs as you like:

  *   MultipleInputStreamOperator/KeyedMultipleInputTransformation

I originally used your proposed sum types with unions, that worked fine but was 
far from elegant 😊


 I posted a sketch of how to implement MultipleInputStreamOperator a while ago 

 found it (around August 08 2024):

Here the essentials again:

Hi Christian,
As I didn’t find a well-documented reference to MultipleInputStreamOperator, 
let me sketch it (it’s Scala code, easy enough to be transformed to Java):

  *   Implements 4 inputs , the second is a broadcast stream
  *   ACHEP, CEIEP, V1EP, AMAP are typedef of the respective input types



//implement the operator
class YourEnrichmentOperator(params: StreamOperatorParameters[O])
  //specify number of inputs (4)
  extends AbstractStreamOperatorV2[O](params, 4)
    with MultipleInputStreamOperator[O]
    with Triggerable[K, VoidNamespace]
{
  @transient var collector: TimestampedCollector[O] = null
  @transient var internalTimerService: InternalTimerService[VoidNamespace] = 
null

  override def open(): Unit = {
    super.open()
    require(configuration != null, "Missing configuration")

    //setup collector and timer services (modelled after other operators)
    collector = new TimestampedCollector[O](output)
    internalTimerService = getInternalTimerService(
      "user-timers",
      VoidNamespaceSerializer.INSTANCE,
      this
    )
  }

 //implement each input
  val input1achep =
    new AbstractInput[ACHEP, O](this, 1) {
      override def processElement(element: StreamRecord[ACHEP]): Unit = {
        val achep: ACHEP = element.getValue
        val et = element.getTimestamp
        val key = getCurrentKey.asInstanceOf[K]
        collector.setAbsoluteTimestamp(et)
        ...
      }
    }

  val input2ceiep =
    new AbstractInput[CEIEP, O](this, 2) {
    ...
  }

  val input3v1ep =
    new AbstractInput[V1EP, O](this, 3) {
      ...
    }

  val input4amap =
    new AbstractInput[AMAP, O](this, 4) {
      ...
    }

  //implement getInputs(...) with all configured inputs
  override def getInputs: util.List[Input[_]] =
    util.List.of[Input[_]](input1achep, input2ceiep, input3v1ep, input4amap)
}

//implement the operator factory
class YourEnrichmentOperatorFactory
  extends AbstractStreamOperatorFactory[O]{
  override def createStreamOperator[T <: StreamOperator[O]](parameters: 
StreamOperatorParameters[O]): T = {
    val operator = new YourEnrichmentOperator(parameters)
    operator.asInstanceOf[T]
  }

  override def getStreamOperatorClass(classLoader: ClassLoader): Class[_ <: 
StreamOperator[_]] = {
    YourEnrichmentOperator.getClass.asInstanceOf[Class[StreamOperator[_]]]
  }
}

//implement job setup
...
  def setupJobFromInputs(achep: DataStream[ACHEP],
            ceiep: DataStream[CEIEP],
            v1ep: DataStream[V1EP],
            amap: DataStream[AMAP]) = {

    val par = your.parallelism
    val factory = new YourEnrichmentOperatorFactory
    val yourEnrichmentTransformation =
      new KeyedMultipleInputTransformation[O](
        "yourEnrichment",
        factory,
        oTI,
        par,
        kTI
      )

    val achepKS = new KeySelector[ACHEP, K] {
      override def getKey(value: ACHEP): K = value.yourId
    }
    yourEnrichmentTransformation.addInput(
      achep.javaStream
        .getTransformation(),
      achepKS
    )

    yourEnrichmentTransformation.addInput(
      ceiep.broadcast.javaStream.getTransformation,
      null
    )

    val v1epKS = new KeySelector[V1EP, K] {
      override def getKey(value: V1EP): K = value.cardHolderId
    }
    yourEnrichmentTransformation.addInput(
      v1ep
        .javaStream
        .getTransformation(),
      v1epKS
    )

    val amapKS = new KeySelector[AMAP, K] {
     override def getKey(value: AMAP): K = value.yourId
    }
    yourEnrichmentTransformation.addInput(
      amap
        .javaStream
        .getTransformation(),
      amapKS
    )

    val yourEnrichedStream =
      new DataStream[O](
        new MultipleConnectedStreams(env.getJavaEnv)
          .transform(yourEnrichmentTransformation)
      ).withScopedMetaData("yourEnriched")
    yourEnrichedStream
  }
...

I hope this helps.

Flink-Greetings
Thias




From: Salva AlcĂĄntara <salcantara...@gmail.com>
Sent: Wednesday, December 4, 2024 2:03 PM
To: user <user@flink.apache.org>
Subject: [External] Joining Streams: "One operator with N inputs" vs "N-1 
co-processors"

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


I have a job which basically joins different inputs together, all partitioned 
by the same key.

I originally took the typical approach and created a pipeline consisting of N-1 
successive joins, each one implemented using a DataStream co-process function.

To avoid shuffling and also some state duplication across operators, I am now 
considering the following alternative design:
- Collapse all the pipeline into a single (fat) operator
- This operator will process all the inputs, effectively

Since Flink does not support side inputs yet, they need to be simulated, e.g., 
by unioning all the different inputs into a sum type (a tuple or a POJO with 
one field for each type of input).

Has anyone experimented with these two (somehow dual) approaches? If so, could 
you provide some guidance/advice to decide which one to use?

On a related note, are there any plans to move 
FLIP-17<https://cwiki.apache.org/confluence/display/FLINK/FLIP-17%3A+Side+Inputs+for+DataStream+API>
 forward?

Regards,

Salva
Diese Nachricht ist ausschliesslich fĂŒr den Adressaten bestimmt und beinhaltet 
unter UmstÀnden vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewĂ€hrleistet werden kann, ĂŒbernehmen wir keine 
Haftung fĂŒr die GewĂ€hrung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtĂŒmlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller AnhÀnge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to