Hi,

I used PopularPlacesFromKafka from
dataartisans.flinktraining.exercises as the basis. I made very minor
changes

and the session window is not triggered. If I use ProcessingTime
instead of EventTime it works. Here is my code.

Appreciate any help. Thanks

object KafkaEventTimeWindow {

  private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
  private val LOCAL_KAFKA_BROKER = "localhost:9092"
  private val CON_GROUP = "KafkaEventTimeSessionWindow"
  private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val kafkaProps = new Properties
    kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
    kafkaProps.setProperty("group.id", CON_GROUP)
    kafkaProps.setProperty("auto.offset.reset", "earliest")

    val consumer = new FlinkKafkaConsumer011[PositionEventProto](
      "positionevent",
      new PositionEventProtoSchema,
      kafkaProps)
    consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)

    val posstream = env.addSource(consumer)

    def convtoepochmilli(cdt: String): Long = {
      val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
      val i:Instant  = odt.toInstant();
      val millis:Long = i.toEpochMilli();
      millis
    }

    val outputstream = posstream
      .mapWith{case(p) => (p.getConsumerUserId,
convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
      .keyBy(0)
      .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
      .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }

    outputstream.print()

    // execute the transformation pipeline
    env.execute("Output Stream")
  }

}

class PositionEventProtoTSAssigner
  extends 
BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60))
{

  override def extractTimestamp(pos: PositionEventProto): Long = {
    val  odt:OffsetDateTime =
OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
    val i:Instant  = odt.toInstant();
    val millis:Long = i.toEpochMilli();
    millis
  }
}

Reply via email to