Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3783 @rtudoran @fhueske the first implementation I made was with the state in the ProcessFunction without code generated aggregation function. Second, I pushed a branch with the state in the process function using the code generated process function. Then, third I moved the state within the code generated function. It is not clear to me why the state cannot be within the code generated function. Could you please clarify so that we can understand whether it is worth working around it. This feature is quite important for us. Anyway, you could have a look at the branch that uses the state in the process function and uses the code generated aggregation functions. Basically, rather than generate one code generated function for all the aggregations, I create one class for each, and then I call the corresponding accumulate/retract using the distinct logic when marked in the process function.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---