Hi Ustinov,

I guess you have mixed the concept between remainder and the parallelism,
i.e., data with remainder 0 don't mean they will be processed by the 0th
task after keyBy.
Flink will perform a Hash function on the key you have provided, and
partition the record based on the key group range.

KeyBy makes sure that the same key goes to the same place, if you want to
balance the workload, you need to have more different keys.

Best, Hequn


On Fri, Jun 21, 2019 at 6:23 PM Ustinov Anton <ustinov....@gmail.com> wrote:

> I have a simple job that reads JSON messages from Kafka topic and
> proccesses them like this:
>
> SingleOutputStreamOperator<Integer> result = ds
>         .filter(ev -> ev.has(cookieFieldName))
>         .map(ev -> ev.get(cookieFieldName).asText())
>         .keyBy(new CookieKeySelector(env.getParallelism()))
>         .timeWindow(Time.seconds(period))
>         .aggregate(new CookieAggregate())
>         .timeWindowAll(Time.seconds(period))
>         .reduce((v1, v2) -> v1 + v2);
>
> CookieKeySelector counts MD5 hash from cookie value and calculate
> remainder from division on job parallelism. CookieAggreage counts unique
> cookie values in window. I see in Flink Dashboard that only half of
> windows are getting messages to process. Number of working windows depends
> on job parallelism. Why only part of windows compute useful aggregates?
> I’ve tried to use random numbers as a key and still get same result.
>
> Additional information: Flink 1.8.0, runs on a single node with 56 CPUs,
> 256G RAM, 10GB/s network.
>
>
> Anton Ustinov
> ustinov....@gmail.com
>
>

Reply via email to