Hi shyla, I answered a similar question on stackoverflow[1], you can take a look first.
Best, Hequn [1] https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <deshpandesh...@gmail.com> wrote: > Hi, > > I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as > the basis. I made very minor changes > > and the session window is not triggered. If I use ProcessingTime instead of > EventTime it works. Here is my code. > > Appreciate any help. Thanks > > object KafkaEventTimeWindow { > > private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" > private val LOCAL_KAFKA_BROKER = "localhost:9092" > private val CON_GROUP = "KafkaEventTimeSessionWindow" > private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 > seconds > > def main(args: Array[String]) { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > val kafkaProps = new Properties > kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) > kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) > kafkaProps.setProperty("group.id", CON_GROUP) > kafkaProps.setProperty("auto.offset.reset", "earliest") > > val consumer = new FlinkKafkaConsumer011[PositionEventProto]( > "positionevent", > new PositionEventProtoSchema, > kafkaProps) > consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner) > > val posstream = env.addSource(consumer) > > def convtoepochmilli(cdt: String): Long = { > val odt:OffsetDateTime = OffsetDateTime.parse(cdt); > val i:Instant = odt.toInstant(); > val millis:Long = i.toEpochMilli(); > millis > } > > val outputstream = posstream > .mapWith{case(p) => (p.getConsumerUserId, > convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} > .keyBy(0) > .window(EventTimeSessionWindows.withGap(Time.seconds(60))) > .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } > > outputstream.print() > > // execute the transformation pipeline > env.execute("Output Stream") > } > > } > > class PositionEventProtoTSAssigner > extends > BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) > { > > override def extractTimestamp(pos: PositionEventProto): Long = { > val odt:OffsetDateTime = > OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); > val i:Instant = odt.toInstant(); > val millis:Long = i.toEpochMilli(); > millis > } > } > > >