Hi everybody, Could you explain issue https://issues.apache.org/jira/browse/FLINK-4832, please?
Simple, I chose another option for resolve this issue, unlike as described in issue description In the `org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections` I added next code: if( inputData.size() == 0) { IN inCopy = inSerializer.createInstance(); OUT out = function.map(inCopy); result.add(outSerializer.copy(out)); } And I change `org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate` as override def initiate(partial: Row): Unit = { partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type class in each [Type]SumAggregate class are extends SumAggregate[T] } And now next code is executing correct: val sqlQuery = "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) val result = tEnv.sql(sqlQuery) //result == "0,0,0.0,0.0,0,0" val sqlQuery2 = "SELECT count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " + "FROM (select * from MyTable where _1 = 4)" val result2 = tEnv.sql(sqlQuery2) //result == " 0,0,0,0,0,0" Is this the correct solution for this ticket or not? Best regards, Anton Mushin