Hi Stephan,

I faced the similar issue, the way implemented this(though a workaround) is
by making the input to fold function i.e. the initial value to fold
symmetric to what goes into the window function.

I made the initial value to fold function a tuple with all non
required/available index values in that initial tuple as 'null'.

The idea was to have a consistent tuple pass onto both functions and let
individual functions operate/update the index of their choice in the tuple.

The last tuple i.e. returned tuple after both these operations would have
all the index values set up.

val DEFAULT_ACCUMULATOR_VALUE  = (null, List[String]())

.apply(DEFAULT_ACCUMULATOR_VALUE, 
         new MyFoldFunction(),                    // This operates/folds
values in index 1 i.e. the list
         new MyWindowFunction())              // This simply puts in the key
at index 0

In the end I achieve the aggregation task through fold function + element
processing through window function.

Hope this helps!

Regards,
Anchit





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/apply-with-fold-and-window-function-tp10092p10110.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to