Hi Dylan,
I'm assuming your are using Flink 1.12 and the Blink planner?
Beginning from 1.12 you can use the "new" aggregate functions with a
better type inference. So TypeInformation will not be used in this stack.
I tried to come up with an example that should explain the rough design.
I will include this example into the Flink code base. I hope this helps:
import org.apache.flink.table.types.inference.InputTypeStrategies;
public static class LastIfNotNull<T>
extends AggregateFunction<Row, LastIfNotNull.Accumulator<T>> {
public static class Accumulator<T> {
public T value;
public LocalDate date;
}
public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
if (input != null) {
acc.value = input;
acc.date = date;
}
}
@Override
public Row getValue(Accumulator<T> acc) {
return Row.of(acc.value, acc.date);
}
@Override
public Accumulator<T> createAccumulator() {
return new Accumulator<>();
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.inputTypeStrategy(
InputTypeStrategies.sequence(
InputTypeStrategies.ANY,
InputTypeStrategies.explicit(DataTypes.DATE())))
.accumulatorTypeStrategy(
callContext -> {
DataType accDataType =
DataTypes.STRUCTURED(
Accumulator.class,
DataTypes.FIELD(
"value",
callContext.getArgumentDataTypes().get(0)),
DataTypes.FIELD("date",
DataTypes.DATE()));
return Optional.of(accDataType);
})
.outputTypeStrategy(
callContext -> {
DataType argDataType =
callContext.getArgumentDataTypes().get(0);
DataType outputDataType =
DataTypes.ROW(
DataTypes.FIELD("value",
argDataType),
DataTypes.FIELD("date",
DataTypes.DATE()));
return Optional.of(outputDataType);
})
.build();
}
}
Regards,
Timo
On 20.01.21 01:04, Dylan Forciea wrote:
I am attempting to create an aggregate UDF that takes a generic
parameter T, but for the life of me, I can’t seem to get it to work.
The UDF I’m trying to implement takes two input arguments, a value that
is generic, and a date. It will choose the non-null value with the
latest associated date. I had originally done this with separate Top 1
queries connected with a left join, but the memory usage seems far
higher than doing this with a custom aggregate function.
As a first attempt, I tried to use custom type inference to have it
validate that the first argument type is the output type and have a
single function, and also used DataTypes.STRUCTURE to try to define the
shape of my accumulator. However, that resulted in an exception like
this whenever I tried to use a non-string value as the first argument:
[error] Caused by: java.lang.ClassCastException: java.lang.Long cannot
be cast to java.lang.String
[error] at
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
Source)
[error] at
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
[error] at
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
[error] at
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
[error] at GroupAggsHandler$777.getAccumulators(Unknown Source)
[error] at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
[error] at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
[error] at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
[error] at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
[error] at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
[error] at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
[error] at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
[error] at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
[error] at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
[error] at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
[error] at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
[error] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
[error] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
[error] at java.lang.Thread.run(Thread.java:748)
Figuring that I can’t do something of that sort, I tried to follow the
general approach in the Sum accumulator[1] in the Flink source code
where separate classes are derived from a base class, and each
advertises its accumulator shape, but ended up with the exact same stack
trace as above when I tried to create and use a function specifically
for a non-string type like Long.
Is there something I’m missing as far as how this is supposed to be
done? Everything I try either results in a stack track like the above,
or type erasure issues when trying to get type information for the
accumulator. If I just copy the generic code multiple times and just
directly use Long or String rather than using subclassing, then it works
just fine. I appreciate any help I can get on this!
Regards,
Dylan Forciea
[1]
https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala