Thank you for your reply. Please let me know if other classes o full code is needed.
/** * Count how many total events */ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(4, env_config); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS); properties.setProperty("group.id", "test"); properties.setProperty("client.id", "flink_test"); properties.setProperty("auto.offset.reset", "earliest"); final int maxEventDelay = 5; // events are out of order by max x seconds DataStream<BizEvent> bizs = env.addSource(new FlinkKafkaConsumer09<>(KAFKA_TOPIC, new BizSchema(), properties)). assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<BizEvent>() { long curTimeStamp; @Override public long extractTimestamp(BizEvent biz, long currentTimestamp) { curTimeStamp = currentTimestamp; return biz.time.getMillis(); } @Override public long getCurrentWatermark() { return (curTimeStamp - (maxEventDelay * 1000)); } }); DataStream<Tuple2<BizEvent, Integer>> bizCnt = bizs.flatMap(new CountBiz()); DataStream<Tuple2<String, Integer>> bizWindowTotal = bizCnt.timeWindowAll(Time.of(5, TimeUnit.MINUTES)) .apply(new SumStartTsAllWindow()); // Output(start time of windows, counts) public static class SumStartTsAllWindow implements AllWindowFunction<Iterable<Tuple2<BizEvent, Integer>>, Tuple2<String, Integer>, TimeWindow> { private static DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").withLocale(Locale.GERMAN). withZone(DateTimeZone.forID("Europe/Berlin")); @Override public void apply(TimeWindow timeWindow, Iterable<Tuple2<BizEvent, Integer>> values, Collector<Tuple2<String, Integer>> collector) throws Exception { DateTime startTs = new DateTime(timeWindow.getStart(), DateTimeZone.forID("Europe/Berlin")); Iterator<Tuple2<BizEvent, Integer>> it = values.iterator(); int sum=0; while(it.hasNext()){ Tuple2<BizEvent, Integer> value = it.next(); sum += value.f1; } collector.collect(new Tuple2<>(startTs.toString(timeFormatter), sum)); } } // Output (BizEvent, 1) public static class CountBiz implements FlatMapFunction<BizEvent, Tuple2<BizEvent, Integer>> { @Override public void flatMap(BizEvent bizEvent, Collector<Tuple2<BizEvent, Integer>> collector) { //System.out.println("TIme in count!: " + bizEvent.time); collector.collect(new Tuple2<>(bizEvent, (int) 1)); } } -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5151.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.