Hi Anton, executeOnCollection() is only meant for executing Flink Jobs in the local machine without bringing up a local (or actual) Flink cluster. So solving the problem there does not really solve the problem.
The underlying problem is this: in a Map-Reduce world the way to count elements of type T is to map those T to (T, 1) and then to group by T and sum up the ones. If you have no elements then you have no ones that you can sum up, i.e. you also don't realise that you have zero elements. Cheers, Aljoscha On Fri, 21 Oct 2016 at 13:55 Anton Mushin <anton_mus...@epam.com> wrote: > Hi everybody, > Could you explain issue https://issues.apache.org/jira/browse/FLINK-4832, > please? > > Simple, I chose another option for resolve this issue, unlike as described > in issue description > In the > `org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections` > I added next code: > if( inputData.size() == 0) { > IN inCopy = inSerializer.createInstance(); > OUT out = function.map(inCopy); > result.add(outSerializer.copy(out)); > } > > And I change > `org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate` as > > override def initiate(partial: Row): Unit = { > partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to > type class in each [Type]SumAggregate class are extends SumAggregate[T] > } > > And now next code is executing correct: > val sqlQuery = > "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " + > "FROM (select * from MyTable where _1 = 4)" > val ds = env.fromElements( > (1: Byte, 2l,1D,1f,1,1:Short ), > (2: Byte, 2l,1D,1f,1,1:Short )) > > val result = tEnv.sql(sqlQuery) //result == "0,0,0.0,0.0,0,0" > > val sqlQuery2 = > "SELECT > count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " + > "FROM (select * from MyTable where _1 = 4)" > val result2 = tEnv.sql(sqlQuery2) //result == " 0,0,0,0,0,0" > > Is this the correct solution for this ticket or not? > > Best regards, > Anton Mushin >