I have a simple job that reads JSON messages from Kafka topic and proccesses 
them like this:

SingleOutputStreamOperator<Integer> result = ds
        .filter(ev -> ev.has(cookieFieldName))
        .map(ev -> ev.get(cookieFieldName).asText())
        .keyBy(new CookieKeySelector(env.getParallelism()))
        .timeWindow(Time.seconds(period))
        .aggregate(new CookieAggregate())
        .timeWindowAll(Time.seconds(period))
        .reduce((v1, v2) -> v1 + v2);

CookieKeySelector counts MD5 hash from cookie value and calculate remainder 
from division on job parallelism. CookieAggreage counts unique cookie values in 
window. I see in Flink Dashboard that only half of windows are getting messages 
to process. Number of working windows depends on job parallelism. Why only part 
of windows compute useful aggregates? I’ve tried to use random numbers as a key 
and still get same result.

Additional information: Flink 1.8.0, runs on a single node with 56 CPUs, 256G 
RAM, 10GB/s network.


Anton Ustinov
ustinov....@gmail.com <mailto:ustinov....@gmail.com>

Reply via email to