Hi Tony,

operator state can only be kept on the heap.

One thing you could try is to add a random value to you data and keyBy on a
composite key that consists of your original key and the random value.
It is important though, that you actually add the random value to your data
to ensure that the extracted key is always the same, i.e., deterministic
with respect to the data.
This should evenly distribute your data and allow you to use keyed MapState.

Hope this helps,
Fabian

2017-09-19 15:58 GMT+02:00 Tony Wei <tony19920...@gmail.com>:

> Hi,
>
> I have a basic streaming job that continuously persist data from Kafka to
> S3.
> Those data would be grouped by some dimensions and a limited amount.
>
> Originally, I used 'keyBy' and key state to fulfill the requirement.
> However, because the data is extremely skewed, I turned to use map
> function to aggregate data for some partitions only, so that I can balance
> the amount of data in each sub tasks.
>
> I used a HashMap to store data by different dimensions inner map function
> and convert it to operator list state when 'snapshot()' is called.
> But, that makes another problem. Because I can't access operator list
> state directly like using key state in KeyedStream, I have to use heap
> space to store those state. It leads to the limitation of the amount that I
> can cache in map function.
>
> I was wondering if there is any good suggestion to deal with this problem
> or how to use operator list state like this scenario with a better manner.
> Thank you.
>
>
> Best Regards,
> Tony Wei
>

Reply via email to