Hi everybody,
Could you explain issue https://issues.apache.org/jira/browse/FLINK-4832, 
please?

Simple, I chose another option for resolve this issue, unlike as described in 
issue description
In the 
`org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections`
 I added next code: 
        if( inputData.size() == 0) {
                IN inCopy = inSerializer.createInstance();
                OUT out = function.map(inCopy);
                result.add(outSerializer.copy(out));
        }

 And I change 
`org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate` as

        override def initiate(partial: Row): Unit = {
                partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type 
class  in each [Type]SumAggregate class are extends SumAggregate[T]
        }

And now next code is executing correct: 
        val sqlQuery =
                "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
                        "FROM (select * from MyTable where _1 = 4)"
         val ds = env.fromElements(
                 (1: Byte, 2l,1D,1f,1,1:Short ),
                 (2: Byte, 2l,1D,1f,1,1:Short ))

        val result = tEnv.sql(sqlQuery) //result == "0,0,0.0,0.0,0,0"

        val sqlQuery2 =
                "SELECT 
count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
                 "FROM (select * from MyTable where _1 = 4)"
        val result2 = tEnv.sql(sqlQuery2) //result == " 0,0,0,0,0,0"

Is this the correct solution for this ticket or not?

Best regards,
Anton Mushin

Reply via email to