Hi Christian,
As I didn’t find a well-documented reference to MultipleInputStreamOperator, 
let me sketch it (it’s Scala code, easy enough to be transformed to Java):

  *   Implements 4 inputs , the second is a broadcast stream
  *   ACHEP, CEIEP, V1EP, AMAP are typedef of the respective input types

//implement the operator
class YourEnrichmentOperator(params: StreamOperatorParameters[O])
  //specify number of inputs (4)
  extends AbstractStreamOperatorV2[O](params, 4)
    with MultipleInputStreamOperator[O]
    with Triggerable[K, VoidNamespace]
  @transient var collector: TimestampedCollector[O] = null
  @transient var internalTimerService: InternalTimerService[VoidNamespace] = 

  override def open(): Unit = {
    require(configuration != null, "Missing configuration")

    //setup collector and timer services (modelled after other operators)
    collector = new TimestampedCollector[O](output)
    internalTimerService = getInternalTimerService(

 //implement each input
  val input1achep =
    new AbstractInput[ACHEP, O](this, 1) {
      override def processElement(element: StreamRecord[ACHEP]): Unit = {
        val achep: ACHEP = element.getValue
        val et = element.getTimestamp
        val key = getCurrentKey.asInstanceOf[K]

  val input2ceiep =
    new AbstractInput[CEIEP, O](this, 2) {

  val input3v1ep =
    new AbstractInput[V1EP, O](this, 3) {

  val input4amap =
    new AbstractInput[AMAP, O](this, 4) {

  //implement getInputs(...) with all configured inputs
  override def getInputs: util.List[Input[_]] =
    util.List.of[Input[_]](input1achep, input2ceiep, input3v1ep, input4amap)

//implement the operator factory
class YourEnrichmentOperatorFactory
  extends AbstractStreamOperatorFactory[O]{
  override def createStreamOperator[T <: StreamOperator[O]](parameters: 
StreamOperatorParameters[O]): T = {
    val operator = new YourEnrichmentOperator(parameters)

  override def getStreamOperatorClass(classLoader: ClassLoader): Class[_ <: 
StreamOperator[_]] = {

//implement job setup
  def setupJobFromInputs(achep: DataStream[ACHEP],
            ceiep: DataStream[CEIEP],
            v1ep: DataStream[V1EP],
            amap: DataStream[AMAP]) = {

    val par = your.parallelism
    val factory = new YourEnrichmentOperatorFactory
    val yourEnrichmentTransformation =
      new KeyedMultipleInputTransformation[O](

    val achepKS = new KeySelector[ACHEP, K] {
      override def getKey(value: ACHEP): K = value.yourId


    val v1epKS = new KeySelector[V1EP, K] {
      override def getKey(value: V1EP): K = value.cardHolderId

    val amapKS = new KeySelector[AMAP, K] {
     override def getKey(value: AMAP): K = value.yourId

    val yourEnrichedStream =
      new DataStream[O](
        new MultipleConnectedStreams(env.getJavaEnv)

I hope this helps.


Hi Matthias,

I am facing a similar issue with the need to have state on 2 keyed input 
streams, but the state can only be cleared with events occurring in a third 
input stream.
Do you have maybe some examples how to utilize MultipleInputStreamOperator you 
have mentioned?

Thanks and kind regards,

Hi Sachin,

Just as an idea, while you cannot easily share state across operators, you can 
do so within the same operator:

  *   For two such input streams you could connect() the two streams into a 
ConnectedStreams and then process() by means of a KeyedCoProcessFunction
  *   For more than two input streams, implement some 
MultipleInputStreamOperator …
  *   In both cases you can yield multiple independent output streams (if need 
be), by means of multiple side outputs (see here e.g. 

I do that all the time 😊


Sincere Flink greetings


I have a stream which starts from a source and is keyed by a field f.
With the stream process function, I can emit the processed record downstream 
and also update state based on the records it received for the same key.

Now I have another stream which starts from another source and is of the same 
type as the first stream and it is also keyed by the same field f.

In its process function I want to access the last state updated by the first 
stream's process function for the same key, do some processing (update the 
state) and also send the record downstream.

Is there any way I can achieve this in Flink by connecting to the same state 
store ?

Is there any concept of global state in Flink if I cannot achieve this by using 
keyed states associated with an operator's process function ?

Any other way you can think of achieving the same ?


Reply via email to