Hi, I have a basic streaming job that continuously persist data from Kafka to S3. Those data would be grouped by some dimensions and a limited amount.
Originally, I used 'keyBy' and key state to fulfill the requirement. However, because the data is extremely skewed, I turned to use map function to aggregate data for some partitions only, so that I can balance the amount of data in each sub tasks. I used a HashMap to store data by different dimensions inner map function and convert it to operator list state when 'snapshot()' is called. But, that makes another problem. Because I can't access operator list state directly like using key state in KeyedStream, I have to use heap space to store those state. It leads to the limitation of the amount that I can cache in map function. I was wondering if there is any good suggestion to deal with this problem or how to use operator list state like this scenario with a better manner. Thank you. Best Regards, Tony Wei