Hi, Dominik. For data skew, I think you can refer to the tuning and optimization ideas in Flink SQL [1] and implement it manually through the DataStream API. If it is simple processing logic and aggregation operations, you can even use the Flink SQL API directly. Especially the way you manually add polling numbers now is actually the split distinct automatic optimization process [2].
> I see that some taskmanagers handle substantially more load than others (few > million records difference) IIUC, this could be because the combined operator you used subsequently always shuffles these data to the same task manager. You can examine the processing throughput for each vertex on Flink UI by observing the 'records received' metric, to check if there are any other nodes causing data skew except this aggregation node. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#performance-tuning [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#split-distinct-aggregation -- Best! Xuyang At 2024-02-22 15:44:26, "" <dominik.buen...@swisscom.com> wrote: Hi all, I am currently facing the problem of having a pipeline (DataStream API) where I need to split a GenericRecord into its fields and then aggregate all the values of a particular field into 30 minute windows. Therefore, if I were to use only a keyBy field name, I would send all the values of a field to the same parallel instance of the cluster. This is bad as the stream is quite large (15k events/second). What I want to achieve is a more even distribution of events across the different taskmanagers. Currently, I assign a "rolling" number (between 0 and maximum parallelism) to the field name as a secondary key component and use this combination as keyBy. This leads to "partitioned" events, which I have to recombine in a second step by using only the field name of the composite key. I tested this approach and it works but when looking at the Flink WebUI, I see that some taskmanagers handle substantially more load than others (few million records difference). I also had a look at the partitionCustom() but this doesn’t work for KeyedStreams right? Did someone else face a related issue? Any suggestions how I can distribute events with the same key more evenly? Kind Regards Dominik