Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/684#issuecomment-105249478 I coded the grouped case and did the refactoring of SumAggregator and ComparableAggregator to use the new FieldAccessor class to access the user-specified field. The first commit does the refactoring and the second adds the median calculation. As part of the refactoring I also fixed a small bug: SimpleComparableAggregator.reduce was not handling the byAggregate case. I also moved the logic of DataStream.getClassAtPos and checkFieldRange to FieldAccessor. The refactoring also solves FLINK-2039. (by the first two lines of the second overload of FieldAccessor.create) There are three things left to do: deciding where will the median be placed, creating a Jira, and possibly adding the median to the Scala API. I will do these after speaking with @mbalassi tomorrow.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---