I'm using nabble and seems that it has removed the code between raw tags. Here it is again:
import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.{ReducingStateDescriptor, ValueStateDescriptor} import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.Window object TriggerMerge { @SerialVersionUID(1L) private class Sum extends ReduceFunction[Long] { @throws[Exception] override def reduce(value1: Long, value2: Long): Long = value1 + value2 } } class TriggerMerge(val timeout: Long) extends Trigger[TaggedUnion[MainElement, OtherElement], Window] with LazyLogging { val elementsToReceiveDesc = new ValueStateDescriptor[Int]("elements-to-receive", classOf[Int]) val elementsReceivedDesc = new ReducingStateDescriptor[Long]("elements-received", new TriggerMerge.Sum, classOf[Long]) override def onElement(element: TaggedUnion[MainElement, OtherElement], timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = { var elementsToReceive = Option(ctx.getPartitionedState(elementsToReceiveDesc).value()) var elementsReceived = Option(ctx.getPartitionedState(elementsReceivedDesc).get()) // Update counters if (element.getOne != null) { elementsToReceive match { case Some(_) => logger.error("Received two main elements with the same UUID.") case _ => ctx.getPartitionedState(elementsToReceiveDesc).update(element.getOne.elementsToReceive) } } if (element.getTwo != null) { ctx.getPartitionedState(elementsReceivedDesc).add(1) } // Update deadline timeout val newDeadline = System.currentTimeMillis + timeout ctx.registerProcessingTimeTimer(newDeadline) // Get updated values elementsToReceive = Option(ctx.getPartitionedState(elementsToReceiveDesc).value()) elementsReceived = Option(ctx.getPartitionedState(elementsReceivedDesc).get()) // Check if everything is going as it should if (elementsToReceive.nonEmpty && elementsReceived.nonEmpty && elementsToReceive.get == elementsReceived.get) { TriggerResult.FIRE_AND_PURGE } else { TriggerResult.CONTINUE } } override def clear(window: Window, ctx: TriggerContext): Unit = { // Cleanup state ctx.getPartitionedState(elementsToReceiveDesc).clear() ctx.getPartitionedState(elementsReceivedDesc).clear() } override def onProcessingTime(time: Long, window: Window, ctx: TriggerContext): TriggerResult = { this.clear(window, ctx) TriggerResult.FIRE_AND_PURGE } override def onEventTime(time: Long, window: Window, ctx: TriggerContext): TriggerResult = TriggerResult.CONTINUE } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/