Hi,

I have recently experimented a bit with windowing and event-time mechanism
in flink and either I do not understand how should it work or there is some
kind of a bug.

I have prepared two Source Functions. One that emits watermark itself and
one that does not, but I have prepared a TimestampExtractor that should
produce same results that the previous Source Function, at least from my
point of view.

Afterwards I've prepared a simple summing over an EventTimeTriggered
Sliding Window.

What I expected is a sum of 3*(t_sum) property of Event regardless of the
sleep time in Source Function. That is how the EventTimeSourceFunction
works, but for the SourceFunction it depends on the sleep and does not
equals 3*(t_sum).

I have done some debugging and for the SourceFunction the output of
ExtractTimestampsOperator does not chain to the aggregator operator(the
property output.allOutputs is empty).

Do I understand the mechanism correctly and should my code work as I
described? If not could you please explain a little bit? The code I've
attached to this email.

I would be grateful.

Regards
Dawid Wysakowicz
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.TimestampExtractor
import org.apache.flink.streaming.api.functions.source.{SourceFunction, EventTimeSourceFunction}
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger

object FlinkWindows {

  case class Event(name: String, toSum: Int, timestamp: Long)


  private var hostName: String = null
  private var port: Int = 0

  def main(args: Array[String]) {

    hostName = args(0)
    port = args(1).toInt

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//    env.getConfig.setAutoWatermarkInterval(0)



    //Read from a socket stream at map it to StockPrice objects
    //    val stream = env.socketTextStream(hostName, port).map(x => {
    //      val split = x.split(",")
    //      Event(split(0), split(1).toInt, split(2).toLong)
    //    })
    val stream = env.addSource(new EventSourceWithTimestamp)
      //    val stream = env.fromCollection(genCarStream())
      .keyBy("name")


    stream.assignTimestamps(new TimestampExtractor[Event] {

      private var currentWatermark: Long = Long.MinValue

      override def getCurrentWatermark: Long = currentWatermark

      override def extractWatermark(element: Event, currentTimestamp: Long): Long = {
        if (element.timestamp > currentWatermark) {
          currentWatermark = element.timestamp - 999
        }
        currentWatermark
      }

      override def extractTimestamp(element: Event, currentTimestamp: Long): Long = element.timestamp
    })

    val sumed = stream
      .window(SlidingTimeWindows.of(
        Time.of(3, TimeUnit.SECONDS),
        Time.of(1, TimeUnit.SECONDS)
      ))
      .trigger(EventTimeTrigger.create())
      .sum("toSum")
      .print()



    env.execute("Stock stream")
  }



  class EventSource extends SourceFunction[Event] {

    var isRunning = true
    val offset = System.currentTimeMillis()

    override def cancel(): Unit = isRunning = false

    override def run(ctx: SourceContext[Event]): Unit = {

      var idx = 0
      while (isRunning) {
        Thread.sleep(1500)
        ctx.collect(Event("a", 3, offset + idx * 999))
        ctx.collect(Event("b", 2, offset + idx * 999))
        idx += 1
      }
    }
  }

  class EventSourceWithTimestamp extends EventTimeSourceFunction[Event] {

    var isRunning = true
    val offset = System.currentTimeMillis()

    override def cancel(): Unit = isRunning = false

    override def run(ctx: SourceContext[Event]): Unit = {

      var idx = 0
      while (isRunning) {
        Thread.sleep(1500)
        ctx.collectWithTimestamp(Event("a", 3, offset + idx * 999), offset + idx * 999)
        ctx.collectWithTimestamp(Event("b", 2, offset + idx * 999), offset + idx * 999)
        ctx.emitWatermark(new Watermark(offset + (idx - 1) * 999))
        idx += 1
      }
    }
  }

}

Reply via email to