Timo,
I converted what I had to Java, and ended up with the exact same issue
as before where it will work if I only ever use it on 1 type, but not
if I use it on multiple. Maybe this is a bug?
Dylan
On 1/20/21, 10:06 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
Oh, I think I might have a clue as to what is going on. I notice
that it will work properly when I only call it on Long. I think that
it is using the same generated code for the Converter for whatever was
called first.
Since in Scala I can't declare an object as static within the
class itself, I wonder if it won't generate appropriate Converter code
per subtype. I tried creating a subclass that is specific to the type
within my class and returning that as the accumulator, but that didn't
help. And, I can't refer to that class in the TypeInference since it
isn't static and I get an error from Flink because of that. I'm going
to see if I just write this UDF in Java with an embedded public static
class like you have if it will solve my problems. I'll report back to
let you know what I find. If that works, I'm not quite sure how to
make it work in Scala.
Regards,
Dylan Forciea
On 1/20/21, 9:34 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
As a side note, I also just tried to unify into a single
function registration and used _ as the type parameter in the classOf
calls there and within the TypeInference definition for the
accumulator and still ended up with the exact same stack trace.
Dylan
On 1/20/21, 9:22 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
Timo,
I appreciate it! I am using Flink 1.12.0 right now with
the Blink planner. What you proposed is roughly what I had come up
with the first time around that resulted in the stack trace with the
ClassCastException I had originally included. I saw that you had used
a Row instead of just the value in our example, but changing it that
way didn't seem to help, which makes sense since the problem seems to
be in the code generated for the accumulator Converter and not the
output.
Here is the exact code that caused that error (while
calling LatestNonNullLong):
The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong",
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString",
classOf[LatestNonNull[String]])
The class itself:
import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import
org.apache.flink.table.types.inference.{InputTypeStrategies,
TypeInference}
case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)
class LatestNonNull[T] extends AggregateFunction[T,
LatestNonNullAccumulator[T]] {
override def createAccumulator():
LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
}
override def getValue(acc:
LatestNonNullAccumulator[T]): T = {
acc.value
}
def accumulate(acc: LatestNonNullAccumulator[T], value:
T, date: LocalDate): Unit = {
if (value != null) {
Option(acc.date).fold {
acc.value = value
acc.date = date
} { accDate =>
if (date != null && date.isAfter(accDate)) {
acc.value = value
acc.date = date
}
}
}
}
def merge(
acc: LatestNonNullAccumulator[T],
it:
java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
val a = iter.next()
if (a.value != null) {
Option(acc.date).fold {
acc.value = a.value
acc.date = a.date
} { accDate =>
Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
acc.value = a.value
acc.date = a.date
}
}
}
}
}
}
def resetAccumulator(acc: LatestNonNullAccumulator[T]):
Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
}
override def getTypeInference(typeFactory:
DataTypeFactory): TypeInference = {
TypeInference
.newBuilder()
.inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY,
InputTypeStrategies.explicit(DataTypes.DATE())))
.accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
classOf[LatestNonNullAccumulator[T]],
DataTypes.FIELD("value",
callContext.getArgumentDataTypes.get(0)),
DataTypes.FIELD("date", DataTypes.DATE()))
Optional.of(accDataType)
}
.outputTypeStrategy { callContext =>
val outputDataType =
callContext.getArgumentDataTypes().get(0);
Optional.of(outputDataType);
}
.build()
}
}
Regards,
Dylan Forciea
On 1/20/21, 2:37 AM, "Timo Walther" <twal...@apache.org>
wrote:
Hi Dylan,
I'm assuming your are using Flink 1.12 and the Blink
planner?
Beginning from 1.12 you can use the "new" aggregate
functions with a
better type inference. So TypeInformation will not be
used in this stack.
I tried to come up with an example that should
explain the rough design.
I will include this example into the Flink code base.
I hope this helps:
import
org.apache.flink.table.types.inference.InputTypeStrategies;
public static class LastIfNotNull<T>
extends AggregateFunction<Row,
LastIfNotNull.Accumulator<T>> {
public static class Accumulator<T> {
public T value;
public LocalDate date;
}
public void accumulate(Accumulator<T> acc, T
input, LocalDate date) {
if (input != null) {
acc.value = input;
acc.date = date;
}
}
@Override
public Row getValue(Accumulator<T> acc) {
return Row.of(acc.value, acc.date);
}
@Override
public Accumulator<T> createAccumulator() {
return new Accumulator<>();
}
@Override
public TypeInference
getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.inputTypeStrategy(
InputTypeStrategies.sequence(
InputTypeStrategies.ANY,
InputTypeStrategies.explicit(DataTypes.DATE())))
.accumulatorTypeStrategy(
callContext -> {
DataType accDataType =
DataTypes.STRUCTURED(
Accumulator.class,
DataTypes.FIELD(
"value",
callContext.getArgumentDataTypes().get(0)),
DataTypes.FIELD("date",
DataTypes.DATE()));
return
Optional.of(accDataType);
})
.outputTypeStrategy(
callContext -> {
DataType argDataType =
callContext.getArgumentDataTypes().get(0);
DataType outputDataType =
DataTypes.ROW(
DataTypes.FIELD("value",
argDataType),
DataTypes.FIELD("date",
DataTypes.DATE()));
return
Optional.of(outputDataType);
})
.build();
}
}
Regards,
Timo
On 20.01.21 01:04, Dylan Forciea wrote:
> I am attempting to create an aggregate UDF that
takes a generic
> parameter T, but for the life of me, I can’t seem
to get it to work.
>
> The UDF I’m trying to implement takes two input
arguments, a value that
> is generic, and a date. It will choose the non-null
value with the
> latest associated date. I had originally done this
with separate Top 1
> queries connected with a left join, but the memory
usage seems far
> higher than doing this with a custom aggregate
function.
>
> As a first attempt, I tried to use custom type
inference to have it
> validate that the first argument type is the output
type and have a
> single function, and also used DataTypes.STRUCTURE
to try to define the
> shape of my accumulator. However, that resulted in
an exception like
> this whenever I tried to use a non-string value as
the first argument:
>
> [error] Caused by: java.lang.ClassCastException:
java.lang.Long cannot
> be cast to java.lang.String
>
> [error] at
>
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
> Source)
>
> [error] at
>
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
>
> [error] at
>
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
>
> [error] at
>
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
>
> [error] at
GroupAggsHandler$777.getAccumulators(Unknown Source)
>
> [error] at
>
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
>
> [error] at
>
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
>
> [error] at
>
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>
> [error] at
>
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
>
> [error] at
>
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
>
> [error] at
>
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
>
> [error] at
>
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>
> [error] at
>
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>
> [error] at
>
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>
> [error] at
>
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>
> [error] at
>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>
> [error] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>
> [error] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>
> [error] at java.lang.Thread.run(Thread.java:748)
>
> Figuring that I can’t do something of that sort, I
tried to follow the
> general approach in the Sum accumulator[1] in the
Flink source code
> where separate classes are derived from a base
class, and each
> advertises its accumulator shape, but ended up with
the exact same stack
> trace as above when I tried to create and use a
function specifically
> for a non-string type like Long.
>
> Is there something I’m missing as far as how this
is supposed to be
> done? Everything I try either results in a stack
track like the above,
> or type erasure issues when trying to get type
information for the
> accumulator. If I just copy the generic code
multiple times and just
> directly use Long or String rather than using
subclassing, then it works
> just fine. I appreciate any help I can get on this!
>
> Regards,
>
> Dylan Forciea
>
> [1]
>
https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
>