Hi Flink users,

We have an issue that TimeWindowAll() doesn't assign properly. The sum
should be in the same window but is generated in separate windows.

For example in the following, window 832348384 has window start time
2016-07-20T05:57:00.000 with counts 36, and there is another window
832348384 has window start time 2016-07-20T05:57:00.000 with count 1. They
should be aggregated in the same window 832348384 with counts 37.

...// hashCode in winodw, sum of events in the window, window start time
{"hashCode":-832348384,"count":36,"startDate":"2016-07-20T05:57:00.000"}
{"hashCode":-832348384,"count":1,"startDate":"2016-07-20T05:57:00.000"}
{"hashCode":-830444128,"count":452,"startDate":"2016-07-20T05:58:00.000"}
{"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
{"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
{"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
...

Example code is as follows:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
Config.bootstrapServers);
        properties.setProperty("group.id",
parameter.getRequired("groupId"));
        properties.setProperty("auto.offset.reset", "earliest");

        FlinkKafkaConsumer09<JSONObject> kafkaConsumer = new
FlinkKafkaConsumer09<>(Config.topic, new JSONSchema(), properties);

        DataStream<JSONObject> streams = env.addSource(kafkaConsumer)
                .assignTimestampsAndWatermarks(new
CorrelationWatermark()).rebalance();

        DataStream<JSONObject> afterWindow =
streams.timeWindowAll(Time.minutes(1))
                .apply(new SumAllWindow());
      

public static class SumAllWindow implements AllWindowFunction<JSONObject,
            JSONObject, TimeWindow> {

        @Override
        public void apply(TimeWindow timeWindow, Iterable<JSONObject>
values,
                          Collector<JSONObject> collector) throws Exception
{

            DateTime startTs = new DateTime(timeWindow.getStart());
            JSONObject jsonObject = new JSONObject();

            int sum = 0;
            for (JSONObject value : values){
                sum += 1;
            }

            jsonObject.put("startDate", startTs.toString());
            jsonObject.put("count", sum);
            jsonObject.put("hashCode", timeWindow.hashCode());
            collector.collect(jsonObject);
        }
    }


public class CorrelationWatermark implements
AssignerWithPeriodicWatermarks<JSONObject> {
    private final long maxOutOfOrderness = 10000 * 1; 
    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(JSONObject element, long
previousElementTimestamp) {      
        long timestamp =
DateTime.parse(element.get("occurredAt").toString(),
Config.timeFormatter).getMillis();       
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

We have no problem with a smaller Kafka topic with Flink 1.0.3. Do we make a
mistake somewhere?
Please let me know if any further information is required to resolve this
issue.

Best,

Sendoh



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-tp8201.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to