Hi Timo,
Okay, then the aggregate function should look like this:
> public static class Agg extends AggregateFunction<Integer[],
> ArrayList<Integer>> {
> @Override
> public ArrayList<Integer> createAccumulator() {
> return new ArrayList<>();
> }
> @Override
> public Integer[] getValue(ArrayList<Integer> acc) {
> return acc.toArray(new Integer[0]);
> }
> public void accumulate(ArrayList<Integer> acc, int i) {
> acc.add(i);
> }
> @Override
> public TypeInformation<Integer[]> getResultType() {
> return OBJECT_ARRAY(Types.INT);
> }
> }
Now the program outputs:
> 2> +I([1, 2])
Thanks,
Dongwon
On Fri, Nov 27, 2020 at 5:38 PM Timo Walther <[email protected]> wrote:
> Hi,
>
> first of all we don't support ListTypeInfo in Table API. Therefore, it
> is treated as a RAW type. The exception during exception creation is a
> bug that should be fixed in future version. But the mismatch is valid:
>
> ARRAY<INT> is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`.
> Can you try this as the result type of your aggregate function.
>
> Reagrds,
> Timo
>
>
> On 26.11.20 18:13, Dongwon Kim wrote:
> > Hello,
> >
> > I'm using Flink-1.11.2.
> >
> > Let's assume that I want to store on a table the result of the following
> > UDAF:
> >
> > public class Agg extends AggregateFunction<List<Integer>,
> > List<Integer>> {
> > @Override
> > public List<Integer> createAccumulator() {
> > return new LinkedList<>();
> > }
> > @Override
> > public List<Integer> getValue(List<Integer> acc) {
> > return acc;
> > }
> > public void accumulate(List<Integer> acc, int i) {
> > acc.add(i);
> > }
> > @Override
> > public TypeInformation<List<Integer>> getResultType() {
> > return new ListTypeInfo<>(Integer.class);
> > }
> > }
> >
> >
> > The main program looks as follow:
> >
> > public class TestMain {
> > public static void main(String[] args) {
> > EnvironmentSettings settings = EnvironmentSettings.newInstance()
> > .inBatchMode()
> > .build();
> > TableEnvironment tEnv = TableEnvironment.create(settings);
> > tEnv.executeSql(
> > "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() +
> "'"
> > );
> > Table t = tEnv.sqlQuery(
> > "SELECT agg(c2)\n" +
> > "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
> > "GROUP BY c1"
> > );
> > tEnv.executeSql(
> > "CREATE TABLE output (a ARRAY<INT>) WITH ('connector' =
> 'print')"
> > );
> > /**
> > * root
> > * |-- EXPR$0: RAW('java.util.List', ?)
> > */
> > t.printSchema();
> > t.executeInsert("output" );
> > }
> > }
> >
> >
> > This program fails with the following exception:
> >
> > Exception in thread "main"
> > org.apache.flink.table.api.TableException: A raw type backed by type
> > information has no serializable string representation. It needs to
> > be resolved into a proper raw type.
> > at
> >
>
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
> > at
> >
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> > at
> >
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> > at
> >
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
> > at
> >
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
> > at
> >
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
> > at scala.Option.map(Option.scala:146)
> > at
> >
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> > at
> >
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> > at
> >
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> > at
> >
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> > at
> >
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> > at
> >
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> > at
> >
>
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
> > at
> >
>
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
> > at my.TestMain.main(TestMain.java:62)
> >
> >
> > I found that two types do not match:
> > - queryLogicalType : ROW<`EXPR$0` RAW('java.util.List', ?)>
> > - sinkLogicalType : ROW<`a` ARRAY<INT>>
> >
> > Why does the queryLogicalType contain 'RAW' instead of 'ARRAY'?
> > Is there no way for UDAF to return java.lang.List<T> and store it as
> ARRAY?
> >
> > Thanks in advance,
> >
> > Dongwon
>
>