Hi Flink users, We have an issue that TimeWindowAll() doesn't assign properly. The sum should be in the same window but is generated in separate windows.
For example in the following, window 832348384 has window start time 2016-07-20T05:57:00.000 with counts 36, and there is another window 832348384 has window start time 2016-07-20T05:57:00.000 with count 1. They should be aggregated in the same window 832348384 with counts 37. ...// hashCode in winodw, sum of events in the window, window start time {"hashCode":-832348384,"count":36,"startDate":"2016-07-20T05:57:00.000"} {"hashCode":-832348384,"count":1,"startDate":"2016-07-20T05:57:00.000"} {"hashCode":-830444128,"count":452,"startDate":"2016-07-20T05:58:00.000"} {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"} {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"} {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"} ... Example code is as follows: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", Config.bootstrapServers); properties.setProperty("group.id", parameter.getRequired("groupId")); properties.setProperty("auto.offset.reset", "earliest"); FlinkKafkaConsumer09<JSONObject> kafkaConsumer = new FlinkKafkaConsumer09<>(Config.topic, new JSONSchema(), properties); DataStream<JSONObject> streams = env.addSource(kafkaConsumer) .assignTimestampsAndWatermarks(new CorrelationWatermark()).rebalance(); DataStream<JSONObject> afterWindow = streams.timeWindowAll(Time.minutes(1)) .apply(new SumAllWindow()); public static class SumAllWindow implements AllWindowFunction<JSONObject, JSONObject, TimeWindow> { @Override public void apply(TimeWindow timeWindow, Iterable<JSONObject> values, Collector<JSONObject> collector) throws Exception { DateTime startTs = new DateTime(timeWindow.getStart()); JSONObject jsonObject = new JSONObject(); int sum = 0; for (JSONObject value : values){ sum += 1; } jsonObject.put("startDate", startTs.toString()); jsonObject.put("count", sum); jsonObject.put("hashCode", timeWindow.hashCode()); collector.collect(jsonObject); } } public class CorrelationWatermark implements AssignerWithPeriodicWatermarks<JSONObject> { private final long maxOutOfOrderness = 10000 * 1; private long currentMaxTimestamp; @Override public long extractTimestamp(JSONObject element, long previousElementTimestamp) { long timestamp = DateTime.parse(element.get("occurredAt").toString(), Config.timeFormatter).getMillis(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } We have no problem with a smaller Kafka topic with Flink 1.0.3. Do we make a mistake somewhere? Please let me know if any further information is required to resolve this issue. Best, Sendoh -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-tp8201.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.