I am attempting to create an aggregate UDF that takes a generic parameter T, but for the life of me, I can’t seem to get it to work.
The UDF I’m trying to implement takes two input arguments, a value that is generic, and a date. It will choose the non-null value with the latest associated date. I had originally done this with separate Top 1 queries connected with a left join, but the memory usage seems far higher than doing this with a custom aggregate function. As a first attempt, I tried to use custom type inference to have it validate that the first argument type is the output type and have a single function, and also used DataTypes.STRUCTURE to try to define the shape of my accumulator. However, that resulted in an exception like this whenever I tried to use a non-string value as the first argument: [error] Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String [error] at io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown Source) [error] at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92) [error] at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47) [error] at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59) [error] at GroupAggsHandler$777.getAccumulators(Unknown Source) [error] at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175) [error] at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45) [error] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) [error] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) [error] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179) [error] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) [error] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) [error] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) [error] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) [error] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) [error] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) [error] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [error] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [error] at java.lang.Thread.run(Thread.java:748) Figuring that I can’t do something of that sort, I tried to follow the general approach in the Sum accumulator[1] in the Flink source code where separate classes are derived from a base class, and each advertises its accumulator shape, but ended up with the exact same stack trace as above when I tried to create and use a function specifically for a non-string type like Long. Is there something I’m missing as far as how this is supposed to be done? Everything I try either results in a stack track like the above, or type erasure issues when trying to get type information for the accumulator. If I just copy the generic code multiple times and just directly use Long or String rather than using subclassing, then it works just fine. I appreciate any help I can get on this! Regards, Dylan Forciea [1] https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala