    --- Diff: 
    @@ -306,6 +307,85 @@ object AggregateUtil {
    +    * Create a 
[[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
    +    * for aggregates.
    +    * The function returns aggregate values of all aggregate function 
which are
    +    * organized by the following format:
    +    *
    +    * {{{
    +    *       avg(x) aggOffsetInRow = 2  count(z) aggOffsetInRow = 5
    +    *           |                          |          
    +    *           |                          |                   |
    +    *           v                          v                   v
    +    *        +--------+--------+--------+--------+-----------+---------+
    +    *        |  sum1  | count1 |  sum2  | count2 |windowStart|windowEnd|
    +    *        +--------+--------+--------+--------+-----------+---------+
    +    *                               ^                 ^
    +    *                               |                 |
    +    *             sum(y) aggOffsetInRow = 4    windowStart(min(rowtime))
    +    *
    +    * }}}
    +    *
    +    */
    +  def createDataSetWindowAggregationMapPartitionFunction(
    +    window: LogicalWindow,
    +    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +    inputType: RelDataType,
    +    outputType: RelDataType = null,
    +    properties: Seq[NamedWindowProperty] = null,
    +    isPreMapPartition: Boolean = true,
    +    isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
    +    val aggregates = transformToAggregateFunctions(
    +      inputType,
    +      0)._2
    +    val intermediateRowArity =
    +    window match {
    +      case EventTimeSessionGroupWindow(_, _, gap) =>
    +        if (isPreMapPartition) {
    +          val preMapReturnType: RowTypeInfo =
    +            createAggregateBufferDataType(
    +              Array(),
    +              aggregates,
    +              inputType,
    +              Option(Array(BasicTypeInfo.LONG_TYPE_INFO, 
    +          new DataSetSessionWindowAggregatePreProcessor(
    +            aggregates,
    +            Array(),
    +            // the addition two fields are used to store window-start and 
window-end attributes
