I wanted to report that I tried out your PR, and it does solve my issue. I am able to create a generic LatestNonNull and it appears to do what is expected.
Thanks, Dylan Forciea On 1/21/21, 8:50 AM, "Timo Walther" <twal...@apache.org> wrote: I opened a PR. Feel free to try it out. https://github.com/apache/flink/pull/14720 Btw: >> env.createTemporarySystemFunction("LatestNonNullLong", >> classOf[LatestNonNull[Long]]) >> >> env.createTemporarySystemFunction("LatestNonNullString", >> classOf[LatestNonNull[String]]) don't make a difference. The generics will be type erased in bytecode and only the class name matters. Thanks, Timo On 21.01.21 11:36, Timo Walther wrote: > Hi Dylan, > > thanks for the investigation. I can now also reproduce it my code. Yes, > this is a bug. I opened > > https://issues.apache.org/jira/browse/FLINK-21070 > > and will try to fix this asap. > > Thanks, > Timo > > On 20.01.21 17:52, Dylan Forciea wrote: >> Timo, >> >> I converted what I had to Java, and ended up with the exact same issue >> as before where it will work if I only ever use it on 1 type, but not >> if I use it on multiple. Maybe this is a bug? >> >> Dylan >> >> On 1/20/21, 10:06 AM, "Dylan Forciea" <dy...@oseberg.io> wrote: >> >> Oh, I think I might have a clue as to what is going on. I notice >> that it will work properly when I only call it on Long. I think that >> it is using the same generated code for the Converter for whatever was >> called first. >> >> Since in Scala I can't declare an object as static within the >> class itself, I wonder if it won't generate appropriate Converter code >> per subtype. I tried creating a subclass that is specific to the type >> within my class and returning that as the accumulator, but that didn't >> help. And, I can't refer to that class in the TypeInference since it >> isn't static and I get an error from Flink because of that. I'm going >> to see if I just write this UDF in Java with an embedded public static >> class like you have if it will solve my problems. I'll report back to >> let you know what I find. If that works, I'm not quite sure how to >> make it work in Scala. >> >> Regards, >> Dylan Forciea >> >> On 1/20/21, 9:34 AM, "Dylan Forciea" <dy...@oseberg.io> wrote: >> >> As a side note, I also just tried to unify into a single >> function registration and used _ as the type parameter in the classOf >> calls there and within the TypeInference definition for the >> accumulator and still ended up with the exact same stack trace. >> >> Dylan >> >> On 1/20/21, 9:22 AM, "Dylan Forciea" <dy...@oseberg.io> wrote: >> >> Timo, >> >> I appreciate it! I am using Flink 1.12.0 right now with >> the Blink planner. What you proposed is roughly what I had come up >> with the first time around that resulted in the stack trace with the >> ClassCastException I had originally included. I saw that you had used >> a Row instead of just the value in our example, but changing it that >> way didn't seem to help, which makes sense since the problem seems to >> be in the code generated for the accumulator Converter and not the >> output. >> >> Here is the exact code that caused that error (while >> calling LatestNonNullLong): >> >> The registration of the below: >> >> env.createTemporarySystemFunction("LatestNonNullLong", >> classOf[LatestNonNull[Long]]) >> >> env.createTemporarySystemFunction("LatestNonNullString", >> classOf[LatestNonNull[String]]) >> >> >> The class itself: >> >> import java.time.LocalDate >> import java.util.Optional >> import org.apache.flink.table.api.DataTypes >> import org.apache.flink.table.catalog.DataTypeFactory >> import org.apache.flink.table.functions.AggregateFunction >> import >> org.apache.flink.table.types.inference.{InputTypeStrategies, >> TypeInference} >> >> case class LatestNonNullAccumulator[T]( >> var value: T = null.asInstanceOf[T], >> var date: LocalDate = null) >> >> class LatestNonNull[T] extends AggregateFunction[T, >> LatestNonNullAccumulator[T]] { >> >> override def createAccumulator(): >> LatestNonNullAccumulator[T] = { >> LatestNonNullAccumulator[T]() >> } >> >> override def getValue(acc: >> LatestNonNullAccumulator[T]): T = { >> acc.value >> } >> >> def accumulate(acc: LatestNonNullAccumulator[T], value: >> T, date: LocalDate): Unit = { >> if (value != null) { >> Option(acc.date).fold { >> acc.value = value >> acc.date = date >> } { accDate => >> if (date != null && date.isAfter(accDate)) { >> acc.value = value >> acc.date = date >> } >> } >> } >> } >> >> def merge( >> acc: LatestNonNullAccumulator[T], >> it: >> java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = { >> val iter = it.iterator() >> while (iter.hasNext) { >> val a = iter.next() >> if (a.value != null) { >> Option(acc.date).fold { >> acc.value = a.value >> acc.date = a.date >> } { accDate => >> Option(a.date).map { curDate => >> if (curDate.isAfter(accDate)) { >> acc.value = a.value >> acc.date = a.date >> } >> } >> } >> } >> } >> } >> >> def resetAccumulator(acc: LatestNonNullAccumulator[T]): >> Unit = { >> acc.value = null.asInstanceOf[T] >> acc.date = null >> } >> >> override def getTypeInference(typeFactory: >> DataTypeFactory): TypeInference = { >> TypeInference >> .newBuilder() >> .inputTypeStrategy(InputTypeStrategies >> .sequence(InputTypeStrategies.ANY, >> InputTypeStrategies.explicit(DataTypes.DATE()))) >> .accumulatorTypeStrategy { callContext => >> val accDataType = DataTypes.STRUCTURED( >> classOf[LatestNonNullAccumulator[T]], >> DataTypes.FIELD("value", >> callContext.getArgumentDataTypes.get(0)), >> DataTypes.FIELD("date", DataTypes.DATE())) >> >> Optional.of(accDataType) >> } >> .outputTypeStrategy { callContext => >> val outputDataType = >> callContext.getArgumentDataTypes().get(0); >> Optional.of(outputDataType); >> } >> .build() >> } >> } >> >> Regards, >> Dylan Forciea >> >> On 1/20/21, 2:37 AM, "Timo Walther" <twal...@apache.org> >> wrote: >> >> Hi Dylan, >> >> I'm assuming your are using Flink 1.12 and the Blink >> planner? >> >> Beginning from 1.12 you can use the "new" aggregate >> functions with a >> better type inference. So TypeInformation will not be >> used in this stack. >> >> I tried to come up with an example that should >> explain the rough design. >> I will include this example into the Flink code base. >> I hope this helps: >> >> >> >> import >> org.apache.flink.table.types.inference.InputTypeStrategies; >> >> public static class LastIfNotNull<T> >> extends AggregateFunction<Row, >> LastIfNotNull.Accumulator<T>> { >> >> public static class Accumulator<T> { >> public T value; >> public LocalDate date; >> } >> >> public void accumulate(Accumulator<T> acc, T >> input, LocalDate date) { >> if (input != null) { >> acc.value = input; >> acc.date = date; >> } >> } >> >> @Override >> public Row getValue(Accumulator<T> acc) { >> return Row.of(acc.value, acc.date); >> } >> >> @Override >> public Accumulator<T> createAccumulator() { >> return new Accumulator<>(); >> } >> >> @Override >> public TypeInference >> getTypeInference(DataTypeFactory typeFactory) { >> return TypeInference.newBuilder() >> .inputTypeStrategy( >> InputTypeStrategies.sequence( >> >> InputTypeStrategies.ANY, >> >> InputTypeStrategies.explicit(DataTypes.DATE()))) >> .accumulatorTypeStrategy( >> callContext -> { >> DataType accDataType = >> >> DataTypes.STRUCTURED( >> >> Accumulator.class, >> >> DataTypes.FIELD( >> >> "value", >> >> callContext.getArgumentDataTypes().get(0)), >> >> DataTypes.FIELD("date", >> DataTypes.DATE())); >> return >> Optional.of(accDataType); >> }) >> .outputTypeStrategy( >> callContext -> { >> DataType argDataType = >> callContext.getArgumentDataTypes().get(0); >> DataType outputDataType = >> DataTypes.ROW( >> >> DataTypes.FIELD("value", >> argDataType), >> >> DataTypes.FIELD("date", >> DataTypes.DATE())); >> return >> Optional.of(outputDataType); >> }) >> .build(); >> } >> } >> >> Regards, >> Timo >> >> >> >> On 20.01.21 01:04, Dylan Forciea wrote: >> > I am attempting to create an aggregate UDF that >> takes a generic >> > parameter T, but for the life of me, I can’t seem >> to get it to work. >> > >> > The UDF I’m trying to implement takes two input >> arguments, a value that >> > is generic, and a date. It will choose the non-null >> value with the >> > latest associated date. I had originally done this >> with separate Top 1 >> > queries connected with a left join, but the memory >> usage seems far >> > higher than doing this with a custom aggregate >> function. >> > >> > As a first attempt, I tried to use custom type >> inference to have it >> > validate that the first argument type is the output >> type and have a >> > single function, and also used DataTypes.STRUCTURE >> to try to define the >> > shape of my accumulator. However, that resulted in >> an exception like >> > this whenever I tried to use a non-string value as >> the first argument: >> > >> > [error] Caused by: java.lang.ClassCastException: >> java.lang.Long cannot >> > be cast to java.lang.String >> > >> > [error] at >> > >> io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown >> >> > Source) >> > >> > [error] at >> > >> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92) >> >> > >> > [error] at >> > >> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47) >> >> > >> > [error] at >> > >> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59) >> >> > >> > [error] at >> GroupAggsHandler$777.getAccumulators(Unknown Source) >> > >> > [error] at >> > >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175) >> >> > >> > [error] at >> > >> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45) >> >> > >> > [error] at >> > >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) >> >> > >> > [error] at >> > >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) >> >> > >> > [error] at >> > >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179) >> >> > >> > [error] at >> > >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) >> >> > >> > [error] at >> > >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >> >> > >> > [error] at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) >> >> > >> > [error] at >> > >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >> >> > >> > [error] at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) >> >> > >> > [error] at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) >> >> > >> > [error] at >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> > >> > [error] at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> > >> > [error] at java.lang.Thread.run(Thread.java:748) >> > >> > Figuring that I can’t do something of that sort, I >> tried to follow the >> > general approach in the Sum accumulator[1] in the >> Flink source code >> > where separate classes are derived from a base >> class, and each >> > advertises its accumulator shape, but ended up with >> the exact same stack >> > trace as above when I tried to create and use a >> function specifically >> > for a non-string type like Long. >> > >> > Is there something I’m missing as far as how this >> is supposed to be >> > done? Everything I try either results in a stack >> track like the above, >> > or type erasure issues when trying to get type >> information for the >> > accumulator. If I just copy the generic code >> multiple times and just >> > directly use Long or String rather than using >> subclassing, then it works >> > just fine. I appreciate any help I can get on this! >> > >> > Regards, >> > >> > Dylan Forciea >> > >> > [1] >> > >> https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala >> >> > >> >> >> >> >> >