Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a better type inference. So TypeInformation will not be used in this stack.

I tried to come up with an example that should explain the rough design. I will include this example into the Flink code base. I hope this helps:



import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull<T>
        extends AggregateFunction<Row, LastIfNotNull.Accumulator<T>> {

    public static class Accumulator<T> {
        public T value;
        public LocalDate date;
    }

    public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
        if (input != null) {
            acc.value = input;
            acc.date = date;
        }
    }

    @Override
    public Row getValue(Accumulator<T> acc) {
        return Row.of(acc.value, acc.date);
    }

    @Override
    public Accumulator<T> createAccumulator() {
        return new Accumulator<>();
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return TypeInference.newBuilder()
                .inputTypeStrategy(
                        InputTypeStrategies.sequence(
                                InputTypeStrategies.ANY,

InputTypeStrategies.explicit(DataTypes.DATE())))
                .accumulatorTypeStrategy(
                        callContext -> {
                            DataType accDataType =
                                    DataTypes.STRUCTURED(
                                            Accumulator.class,
                                            DataTypes.FIELD(
                                                    "value",

callContext.getArgumentDataTypes().get(0)),
DataTypes.FIELD("date", DataTypes.DATE()));
                            return Optional.of(accDataType);
                        })
                .outputTypeStrategy(
                        callContext -> {
DataType argDataType = callContext.getArgumentDataTypes().get(0);
                            DataType outputDataType =
                                    DataTypes.ROW(
DataTypes.FIELD("value", argDataType), DataTypes.FIELD("date", DataTypes.DATE()));
                            return Optional.of(outputDataType);
                        })
                .build();
    }
}

Regards,
Timo



On 20.01.21 01:04, Dylan Forciea wrote:
I am attempting to create an aggregate UDF that takes a generic parameter T, but for the life of me, I can’t seem to get it to work.

The UDF I’m trying to implement takes two input arguments, a value that is generic, and a date. It will choose the non-null value with the latest associated date. I had originally done this with separate Top 1 queries connected with a left join, but the memory usage seems far higher than doing this with a custom aggregate function.

As a first attempt, I tried to use custom type inference to have it validate that the first argument type is the output type and have a single function, and also used DataTypes.STRUCTURE to try to define the shape of my accumulator. However, that resulted in an exception like this whenever I tried to use a non-string value as the first argument:

[error] Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String

[error]   at io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown Source)

[error]   at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)

[error]   at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)

[error]   at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)

[error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)

[error]   at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)

[error]   at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)

[error]   at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)

[error]   at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)

[error]   at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)

[error]   at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)

[error]   at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)

[error]   at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)

[error]   at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)

[error]   at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)

[error]   at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)

[error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)

[error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)

[error]   at java.lang.Thread.run(Thread.java:748)

Figuring that I can’t do something of that sort, I tried to follow the general approach in the Sum accumulator[1] in the Flink source code where separate classes are derived from a base class, and each advertises its accumulator shape, but ended up with the exact same stack trace as above when I tried to create and use a function specifically for a non-string type like Long.

Is there something I’m missing as far as how this is supposed to be done? Everything I try either results in a stack track like the above, or type erasure issues when trying to get type information for the accumulator. If I just copy the generic code multiple times and just directly use Long or String rather than using subclassing, then it works just fine. I appreciate any help I can get on this!

Regards,

Dylan Forciea

[1] https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala


Reply via email to