Hi, I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes
and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code. Appreciate any help. Thanks object KafkaEventTimeWindow { private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" private val LOCAL_KAFKA_BROKER = "localhost:9092" private val CON_GROUP = "KafkaEventTimeSessionWindow" private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val kafkaProps = new Properties kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) kafkaProps.setProperty("group.id", CON_GROUP) kafkaProps.setProperty("auto.offset.reset", "earliest") val consumer = new FlinkKafkaConsumer011[PositionEventProto]( "positionevent", new PositionEventProtoSchema, kafkaProps) consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner) val posstream = env.addSource(consumer) def convtoepochmilli(cdt: String): Long = { val odt:OffsetDateTime = OffsetDateTime.parse(cdt); val i:Instant = odt.toInstant(); val millis:Long = i.toEpochMilli(); millis } val outputstream = posstream .mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} .keyBy(0) .window(EventTimeSessionWindows.withGap(Time.seconds(60))) .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } outputstream.print() // execute the transformation pipeline env.execute("Output Stream") } } class PositionEventProtoTSAssigner extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) { override def extractTimestamp(pos: PositionEventProto): Long = { val odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); val i:Instant = odt.toInstant(); val millis:Long = i.toEpochMilli(); millis } }