I'm using nabble and seems that it has removed the code between raw tags.
Here it is again:

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.{ReducingStateDescriptor,
ValueStateDescriptor}
import
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
import
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger,
TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.Window

object TriggerMerge {
  @SerialVersionUID(1L)
  private class Sum extends ReduceFunction[Long] {
    @throws[Exception]
    override def reduce(value1: Long, value2: Long): Long = value1 + value2
  }
}

class TriggerMerge(val timeout: Long) extends
Trigger[TaggedUnion[MainElement, OtherElement], Window] 
 with LazyLogging {

  val elementsToReceiveDesc = new
ValueStateDescriptor[Int]("elements-to-receive", classOf[Int])
  val elementsReceivedDesc = new
ReducingStateDescriptor[Long]("elements-received", new TriggerMerge.Sum,
classOf[Long])

  override def onElement(element: TaggedUnion[MainElement, OtherElement],
timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = {
    var elementsToReceive =
Option(ctx.getPartitionedState(elementsToReceiveDesc).value())
    var elementsReceived =
Option(ctx.getPartitionedState(elementsReceivedDesc).get())

    // Update counters
    if (element.getOne != null) {
      elementsToReceive match {
        case Some(_) => logger.error("Received two main elements with the
same UUID.")
        case _ =>
ctx.getPartitionedState(elementsToReceiveDesc).update(element.getOne.elementsToReceive)
      }
    }
    if (element.getTwo != null) {
      ctx.getPartitionedState(elementsReceivedDesc).add(1)
    }

    // Update deadline timeout
    val newDeadline = System.currentTimeMillis + timeout
    ctx.registerProcessingTimeTimer(newDeadline)

    // Get updated values
    elementsToReceive =
Option(ctx.getPartitionedState(elementsToReceiveDesc).value())
    elementsReceived =
Option(ctx.getPartitionedState(elementsReceivedDesc).get())

    // Check if everything is going as it should
    if (elementsToReceive.nonEmpty && elementsReceived.nonEmpty &&
      elementsToReceive.get == elementsReceived.get) {
      TriggerResult.FIRE_AND_PURGE
    } else {
      TriggerResult.CONTINUE
    }
  }

  override def clear(window: Window, ctx: TriggerContext): Unit = {
    // Cleanup state
    ctx.getPartitionedState(elementsToReceiveDesc).clear()
    ctx.getPartitionedState(elementsReceivedDesc).clear()
  }

  override def onProcessingTime(time: Long, window: Window, ctx:
TriggerContext): TriggerResult =  {
    this.clear(window, ctx)
    TriggerResult.FIRE_AND_PURGE
  }

  override def onEventTime(time: Long, window: Window, ctx: TriggerContext):
TriggerResult = TriggerResult.CONTINUE
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to