Hi Stephan, I faced the similar issue, the way implemented this(though a workaround) is by making the input to fold function i.e. the initial value to fold symmetric to what goes into the window function.
I made the initial value to fold function a tuple with all non required/available index values in that initial tuple as 'null'. The idea was to have a consistent tuple pass onto both functions and let individual functions operate/update the index of their choice in the tuple. The last tuple i.e. returned tuple after both these operations would have all the index values set up. val DEFAULT_ACCUMULATOR_VALUE = (null, List[String]()) .apply(DEFAULT_ACCUMULATOR_VALUE, new MyFoldFunction(), // This operates/folds values in index 1 i.e. the list new MyWindowFunction()) // This simply puts in the key at index 0 In the end I achieve the aggregation task through fold function + element processing through window function. Hope this helps! Regards, Anchit -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/apply-with-fold-and-window-function-tp10092p10110.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.