Timo,
Will do! I have been patching in a change locally that I have a PR [1] out for,
so if this will end up in the next 1.12 patch release, I may add this in with
it once it has been approved and merged.
On a side note, that PR has been out since the end of October (looks like I
need to do a rebase to accommodate the code reformatting change that occurred
since). Is there a process for getting somebody to review it? Not sure if with
the New Year and the 1.12 release and follow-up if it just got lost in the
commotion.
Regards,
Dylan Forciea
[1] https://github.com/apache/flink/pull/13787
On 1/21/21, 8:50 AM, "Timo Walther" <twal...@apache.org> wrote:
I opened a PR. Feel free to try it out.
https://github.com/apache/flink/pull/14720
Btw:
>> env.createTemporarySystemFunction("LatestNonNullLong",
>> classOf[LatestNonNull[Long]])
>>
>> env.createTemporarySystemFunction("LatestNonNullString",
>> classOf[LatestNonNull[String]])
don't make a difference. The generics will be type erased in bytecode
and only the class name matters.
Thanks,
Timo
On 21.01.21 11:36, Timo Walther wrote:
> Hi Dylan,
>
> thanks for the investigation. I can now also reproduce it my code. Yes,
> this is a bug. I opened
>
> https://issues.apache.org/jira/browse/FLINK-21070
>
> and will try to fix this asap.
>
> Thanks,
> Timo
>
> On 20.01.21 17:52, Dylan Forciea wrote:
>> Timo,
>>
>> I converted what I had to Java, and ended up with the exact same issue
>> as before where it will work if I only ever use it on 1 type, but not
>> if I use it on multiple. Maybe this is a bug?
>>
>> Dylan
>>
>> On 1/20/21, 10:06 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
>>
>> Oh, I think I might have a clue as to what is going on. I notice
>> that it will work properly when I only call it on Long. I think that
>> it is using the same generated code for the Converter for whatever was
>> called first.
>>
>> Since in Scala I can't declare an object as static within the
>> class itself, I wonder if it won't generate appropriate Converter code
>> per subtype. I tried creating a subclass that is specific to the type
>> within my class and returning that as the accumulator, but that didn't
>> help. And, I can't refer to that class in the TypeInference since it
>> isn't static and I get an error from Flink because of that. I'm going
>> to see if I just write this UDF in Java with an embedded public static
>> class like you have if it will solve my problems. I'll report back to
>> let you know what I find. If that works, I'm not quite sure how to
>> make it work in Scala.
>>
>> Regards,
>> Dylan Forciea
>>
>> On 1/20/21, 9:34 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
>>
>> As a side note, I also just tried to unify into a single
>> function registration and used _ as the type parameter in the classOf
>> calls there and within the TypeInference definition for the
>> accumulator and still ended up with the exact same stack trace.
>>
>> Dylan
>>
>> On 1/20/21, 9:22 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
>>
>> Timo,
>>
>> I appreciate it! I am using Flink 1.12.0 right now with
>> the Blink planner. What you proposed is roughly what I had come up
>> with the first time around that resulted in the stack trace with the
>> ClassCastException I had originally included. I saw that you had used
>> a Row instead of just the value in our example, but changing it that
>> way didn't seem to help, which makes sense since the problem seems to
>> be in the code generated for the accumulator Converter and not the
>> output.
>>
>> Here is the exact code that caused that error (while
>> calling LatestNonNullLong):
>>
>> The registration of the below:
>>
>> env.createTemporarySystemFunction("LatestNonNullLong",
>> classOf[LatestNonNull[Long]])
>>
>> env.createTemporarySystemFunction("LatestNonNullString",
>> classOf[LatestNonNull[String]])
>>
>>
>> The class itself:
>>
>> import java.time.LocalDate
>> import java.util.Optional
>> import org.apache.flink.table.api.DataTypes
>> import org.apache.flink.table.catalog.DataTypeFactory
>> import org.apache.flink.table.functions.AggregateFunction
>> import
>> org.apache.flink.table.types.inference.{InputTypeStrategies,
>> TypeInference}
>>
>> case class LatestNonNullAccumulator[T](
>> var value: T = null.asInstanceOf[T],
>> var date: LocalDate = null)
>>
>> class LatestNonNull[T] extends AggregateFunction[T,
>> LatestNonNullAccumulator[T]] {
>>
>> override def createAccumulator():
>> LatestNonNullAccumulator[T] = {
>> LatestNonNullAccumulator[T]()
>> }
>>
>> override def getValue(acc:
>> LatestNonNullAccumulator[T]): T = {
>> acc.value
>> }
>>
>> def accumulate(acc: LatestNonNullAccumulator[T], value:
>> T, date: LocalDate): Unit = {
>> if (value != null) {
>> Option(acc.date).fold {
>> acc.value = value
>> acc.date = date
>> } { accDate =>
>> if (date != null && date.isAfter(accDate)) {
>> acc.value = value
>> acc.date = date
>> }
>> }
>> }
>> }
>>
>> def merge(
>> acc: LatestNonNullAccumulator[T],
>> it:
>> java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
>> val iter = it.iterator()
>> while (iter.hasNext) {
>> val a = iter.next()
>> if (a.value != null) {
>> Option(acc.date).fold {
>> acc.value = a.value
>> acc.date = a.date
>> } { accDate =>
>> Option(a.date).map { curDate =>
>> if (curDate.isAfter(accDate)) {
>> acc.value = a.value
>> acc.date = a.date
>> }
>> }
>> }
>> }
>> }
>> }
>>
>> def resetAccumulator(acc: LatestNonNullAccumulator[T]):
>> Unit = {
>> acc.value = null.asInstanceOf[T]
>> acc.date = null
>> }
>>
>> override def getTypeInference(typeFactory:
>> DataTypeFactory): TypeInference = {
>> TypeInference
>> .newBuilder()
>> .inputTypeStrategy(InputTypeStrategies
>> .sequence(InputTypeStrategies.ANY,
>> InputTypeStrategies.explicit(DataTypes.DATE())))
>> .accumulatorTypeStrategy { callContext =>
>> val accDataType = DataTypes.STRUCTURED(
>> classOf[LatestNonNullAccumulator[T]],
>> DataTypes.FIELD("value",
>> callContext.getArgumentDataTypes.get(0)),
>> DataTypes.FIELD("date", DataTypes.DATE()))
>>
>> Optional.of(accDataType)
>> }
>> .outputTypeStrategy { callContext =>
>> val outputDataType =
>> callContext.getArgumentDataTypes().get(0);
>> Optional.of(outputDataType);
>> }
>> .build()
>> }
>> }
>>
>> Regards,
>> Dylan Forciea
>>
>> On 1/20/21, 2:37 AM, "Timo Walther" <twal...@apache.org>
>> wrote:
>>
>> Hi Dylan,
>>
>> I'm assuming your are using Flink 1.12 and the Blink
>> planner?
>>
>> Beginning from 1.12 you can use the "new" aggregate
>> functions with a
>> better type inference. So TypeInformation will not be
>> used in this stack.
>>
>> I tried to come up with an example that should
>> explain the rough design.
>> I will include this example into the Flink code base.
>> I hope this helps:
>>
>>
>>
>> import
>> org.apache.flink.table.types.inference.InputTypeStrategies;
>>
>> public static class LastIfNotNull<T>
>> extends AggregateFunction<Row,
>> LastIfNotNull.Accumulator<T>> {
>>
>> public static class Accumulator<T> {
>> public T value;
>> public LocalDate date;
>> }
>>
>> public void accumulate(Accumulator<T> acc, T
>> input, LocalDate date) {
>> if (input != null) {
>> acc.value = input;
>> acc.date = date;
>> }
>> }
>>
>> @Override
>> public Row getValue(Accumulator<T> acc) {
>> return Row.of(acc.value, acc.date);
>> }
>>
>> @Override
>> public Accumulator<T> createAccumulator() {
>> return new Accumulator<>();
>> }
>>
>> @Override
>> public TypeInference
>> getTypeInference(DataTypeFactory typeFactory) {
>> return TypeInference.newBuilder()
>> .inputTypeStrategy(
>> InputTypeStrategies.sequence(
>>
>> InputTypeStrategies.ANY,
>>
>> InputTypeStrategies.explicit(DataTypes.DATE())))
>> .accumulatorTypeStrategy(
>> callContext -> {
>> DataType accDataType =
>>
>> DataTypes.STRUCTURED(
>>
>> Accumulator.class,
>>
>> DataTypes.FIELD(
>>
>> "value",
>>
>> callContext.getArgumentDataTypes().get(0)),
>>
>> DataTypes.FIELD("date",
>> DataTypes.DATE()));
>> return
>> Optional.of(accDataType);
>> })
>> .outputTypeStrategy(
>> callContext -> {
>> DataType argDataType =
>> callContext.getArgumentDataTypes().get(0);
>> DataType outputDataType =
>> DataTypes.ROW(
>>
>> DataTypes.FIELD("value",
>> argDataType),
>>
>> DataTypes.FIELD("date",
>> DataTypes.DATE()));
>> return
>> Optional.of(outputDataType);
>> })
>> .build();
>> }
>> }
>>
>> Regards,
>> Timo
>>
>>
>>
>> On 20.01.21 01:04, Dylan Forciea wrote:
>> > I am attempting to create an aggregate UDF that
>> takes a generic
>> > parameter T, but for the life of me, I can’t seem
>> to get it to work.
>> >
>> > The UDF I’m trying to implement takes two input
>> arguments, a value that
>> > is generic, and a date. It will choose the non-null
>> value with the
>> > latest associated date. I had originally done this
>> with separate Top 1
>> > queries connected with a left join, but the memory
>> usage seems far
>> > higher than doing this with a custom aggregate
>> function.
>> >
>> > As a first attempt, I tried to use custom type
>> inference to have it
>> > validate that the first argument type is the output
>> type and have a
>> > single function, and also used DataTypes.STRUCTURE
>> to try to define the
>> > shape of my accumulator. However, that resulted in
>> an exception like
>> > this whenever I tried to use a non-string value as
>> the first argument:
>> >
>> > [error] Caused by: java.lang.ClassCastException:
>> java.lang.Long cannot
>> > be cast to java.lang.String
>> >
>> > [error] at
>> >
>>
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
>>
>> > Source)
>> >
>> > [error] at
>> >
>>
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
>>
>> >
>> > [error] at
>> GroupAggsHandler$777.getAccumulators(Unknown Source)
>> >
>> > [error] at
>> >
>>
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>
>> >
>> > [error] at
>> >
>>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>
>> >
>> > [error] at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> >
>> > [error] at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> >
>> > [error] at java.lang.Thread.run(Thread.java:748)
>> >
>> > Figuring that I can’t do something of that sort, I
>> tried to follow the
>> > general approach in the Sum accumulator[1] in the
>> Flink source code
>> > where separate classes are derived from a base
>> class, and each
>> > advertises its accumulator shape, but ended up with
>> the exact same stack
>> > trace as above when I tried to create and use a
>> function specifically
>> > for a non-string type like Long.
>> >
>> > Is there something I’m missing as far as how this
>> is supposed to be
>> > done? Everything I try either results in a stack
>> track like the above,
>> > or type erasure issues when trying to get type
>> information for the
>> > accumulator. If I just copy the generic code
>> multiple times and just
>> > directly use Long or String rather than using
>> subclassing, then it works
>> > just fine. I appreciate any help I can get on this!
>> >
>> > Regards,
>> >
>> > Dylan Forciea
>> >
>> > [1]
>> >
>>
https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
>>
>> >
>>
>>
>>
>>
>>
>