Hi Dylan,

I can help with a review for your PR tomorrow. In general, I would recommend to just ping people a couple of times that have been worked on the component before (see git blame) to get a review. We are all busy and need a bit of pushing from time to time ;-)

Thanks,
Timo

On 21.01.21 16:09, Dylan Forciea wrote:
Timo,

Will do! I have been patching in a change locally that I have a PR [1] out for, 
so if this will end up in the next 1.12 patch release, I may add this in with 
it once it has been approved and merged.

On a side note, that PR has been out since the end of October (looks like I 
need to do a rebase to accommodate the code reformatting change that occurred 
since). Is there a process for getting somebody to review it? Not sure if with 
the New Year and the 1.12 release and follow-up if it just got lost in the 
commotion.

Regards,
Dylan Forciea

[1] https://github.com/apache/flink/pull/13787

On 1/21/21, 8:50 AM, "Timo Walther" <twal...@apache.org> wrote:

     I opened a PR. Feel free to try it out.

     https://github.com/apache/flink/pull/14720

     Btw:

      >> env.createTemporarySystemFunction("LatestNonNullLong",
      >> classOf[LatestNonNull[Long]])
      >>
      >> env.createTemporarySystemFunction("LatestNonNullString",
      >> classOf[LatestNonNull[String]])

     don't make a difference. The generics will be type erased in bytecode
     and only the class name matters.

     Thanks,
     Timo

     On 21.01.21 11:36, Timo Walther wrote:
     > Hi Dylan,
     >
     > thanks for the investigation. I can now also reproduce it my code. Yes,
     > this is a bug. I opened
     >
     > https://issues.apache.org/jira/browse/FLINK-21070
     >
     > and will try to fix this asap.
     >
     > Thanks,
     > Timo
     >
     > On 20.01.21 17:52, Dylan Forciea wrote:
     >> Timo,
     >>
     >> I converted what I had to Java, and ended up with the exact same issue
     >> as before where it will work if I only ever use it on 1 type, but not
     >> if I use it on multiple. Maybe this is a bug?
     >>
     >> Dylan
     >>
     >> On 1/20/21, 10:06 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
     >>
     >>      Oh, I think I might have a clue as to what is going on. I notice
     >> that it will work properly when I only call it on Long. I think that
     >> it is using the same generated code for the Converter for whatever was
     >> called first.
     >>
     >>      Since in Scala I can't declare an object as static within the
     >> class itself, I wonder if it won't generate appropriate Converter code
     >> per subtype. I tried creating a subclass that is specific to the type
     >> within my class and returning that as the accumulator, but that didn't
     >> help. And, I can't refer to that class in the TypeInference since it
     >> isn't static and I get an error from Flink because of that. I'm going
     >> to see if I just write this UDF in Java with an embedded public static
     >> class like you have if it will solve my problems. I'll report back to
     >> let you know what I find. If that works, I'm not quite sure how to
     >> make it work in Scala.
     >>
     >>      Regards,
     >>      Dylan Forciea
     >>
     >>      On 1/20/21, 9:34 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
     >>
     >>          As a side note, I also just tried to unify into a single
     >> function registration and used _ as the type parameter in the classOf
     >> calls there and within the TypeInference definition for the
     >> accumulator and still ended up with the exact same stack trace.
     >>
     >>          Dylan
     >>
     >>          On 1/20/21, 9:22 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
     >>
     >>              Timo,
     >>
     >>              I appreciate it! I am using Flink 1.12.0 right now with
     >> the Blink planner. What you proposed is roughly what I had come up
     >> with the first time around that resulted in the stack trace with the
     >> ClassCastException I had originally included. I saw that you had used
     >> a Row instead of just the value in our example, but changing it that
     >> way didn't seem to help, which makes sense since the problem seems to
     >> be in the code generated for the accumulator Converter and not the
     >> output.
     >>
     >>              Here is the exact code that caused that error (while
     >> calling LatestNonNullLong):
     >>
     >>              The registration of the below:
     >>
     >> env.createTemporarySystemFunction("LatestNonNullLong",
     >> classOf[LatestNonNull[Long]])
     >>
     >> env.createTemporarySystemFunction("LatestNonNullString",
     >> classOf[LatestNonNull[String]])
     >>
     >>
     >>              The class itself:
     >>
     >>              import java.time.LocalDate
     >>              import java.util.Optional
     >>              import org.apache.flink.table.api.DataTypes
     >>              import org.apache.flink.table.catalog.DataTypeFactory
     >>              import org.apache.flink.table.functions.AggregateFunction
     >>              import
     >> org.apache.flink.table.types.inference.{InputTypeStrategies,
     >> TypeInference}
     >>
     >>              case class LatestNonNullAccumulator[T](
     >>                  var value: T = null.asInstanceOf[T],
     >>                  var date: LocalDate = null)
     >>
     >>              class LatestNonNull[T] extends AggregateFunction[T,
     >> LatestNonNullAccumulator[T]] {
     >>
     >>                override def createAccumulator():
     >> LatestNonNullAccumulator[T] = {
     >>                  LatestNonNullAccumulator[T]()
     >>                }
     >>
     >>                override def getValue(acc:
     >> LatestNonNullAccumulator[T]): T = {
     >>                  acc.value
     >>                }
     >>
     >>                def accumulate(acc: LatestNonNullAccumulator[T], value:
     >> T, date: LocalDate): Unit = {
     >>                  if (value != null) {
     >>                    Option(acc.date).fold {
     >>                      acc.value = value
     >>                      acc.date = date
     >>                    } { accDate =>
     >>                      if (date != null && date.isAfter(accDate)) {
     >>                        acc.value = value
     >>                        acc.date = date
     >>                      }
     >>                    }
     >>                  }
     >>                }
     >>
     >>                def merge(
     >>                    acc: LatestNonNullAccumulator[T],
     >>                    it:
     >> java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
     >>                  val iter = it.iterator()
     >>                  while (iter.hasNext) {
     >>                    val a = iter.next()
     >>                    if (a.value != null) {
     >>                      Option(acc.date).fold {
     >>                        acc.value = a.value
     >>                        acc.date = a.date
     >>                      } { accDate =>
     >>                        Option(a.date).map { curDate =>
     >>                          if (curDate.isAfter(accDate)) {
     >>                            acc.value = a.value
     >>                            acc.date = a.date
     >>                          }
     >>                        }
     >>                      }
     >>                    }
     >>                  }
     >>                }
     >>
     >>                def resetAccumulator(acc: LatestNonNullAccumulator[T]):
     >> Unit = {
     >>                  acc.value = null.asInstanceOf[T]
     >>                  acc.date = null
     >>                }
     >>
     >>                override def getTypeInference(typeFactory:
     >> DataTypeFactory): TypeInference = {
     >>                  TypeInference
     >>                    .newBuilder()
     >>                    .inputTypeStrategy(InputTypeStrategies
     >>                      .sequence(InputTypeStrategies.ANY,
     >> InputTypeStrategies.explicit(DataTypes.DATE())))
     >>                    .accumulatorTypeStrategy { callContext =>
     >>                      val accDataType = DataTypes.STRUCTURED(
     >>                        classOf[LatestNonNullAccumulator[T]],
     >>                        DataTypes.FIELD("value",
     >> callContext.getArgumentDataTypes.get(0)),
     >>                        DataTypes.FIELD("date", DataTypes.DATE()))
     >>
     >>                      Optional.of(accDataType)
     >>                    }
     >>                    .outputTypeStrategy { callContext =>
     >>                      val outputDataType =
     >> callContext.getArgumentDataTypes().get(0);
     >>                      Optional.of(outputDataType);
     >>                    }
     >>                    .build()
     >>                }
     >>              }
     >>
     >>              Regards,
     >>              Dylan Forciea
     >>
     >>              On 1/20/21, 2:37 AM, "Timo Walther" <twal...@apache.org>
     >> wrote:
     >>
     >>                  Hi Dylan,
     >>
     >>                  I'm assuming your are using Flink 1.12 and the Blink
     >> planner?
     >>
     >>                  Beginning from 1.12 you can use the "new" aggregate
     >> functions with a
     >>                  better type inference. So TypeInformation will not be
     >> used in this stack.
     >>
     >>                  I tried to come up with an example that should
     >> explain the rough design.
     >>                  I will include this example into the Flink code base.
     >> I hope this helps:
     >>
     >>
     >>
     >>                  import
     >> org.apache.flink.table.types.inference.InputTypeStrategies;
     >>
     >>                  public static class LastIfNotNull<T>
     >>                           extends AggregateFunction<Row,
     >> LastIfNotNull.Accumulator<T>> {
     >>
     >>                       public static class Accumulator<T> {
     >>                           public T value;
     >>                           public LocalDate date;
     >>                       }
     >>
     >>                       public void accumulate(Accumulator<T> acc, T
     >> input, LocalDate date) {
     >>                           if (input != null) {
     >>                               acc.value = input;
     >>                               acc.date = date;
     >>                           }
     >>                       }
     >>
     >>                       @Override
     >>                       public Row getValue(Accumulator<T> acc) {
     >>                           return Row.of(acc.value, acc.date);
     >>                       }
     >>
     >>                       @Override
     >>                       public Accumulator<T> createAccumulator() {
     >>                           return new Accumulator<>();
     >>                       }
     >>
     >>                       @Override
     >>                       public TypeInference
     >> getTypeInference(DataTypeFactory typeFactory) {
     >>                           return TypeInference.newBuilder()
     >>                                   .inputTypeStrategy(
     >>                                           InputTypeStrategies.sequence(
     >>
     >> InputTypeStrategies.ANY,
     >>
     >>                  InputTypeStrategies.explicit(DataTypes.DATE())))
     >>                                   .accumulatorTypeStrategy(
     >>                                           callContext -> {
     >>                                               DataType accDataType =
     >>
     >> DataTypes.STRUCTURED(
     >>
     >> Accumulator.class,
     >>
     >> DataTypes.FIELD(
     >>
     >> "value",
     >>
     >>                  callContext.getArgumentDataTypes().get(0)),
     >>
     >> DataTypes.FIELD("date",
     >>                  DataTypes.DATE()));
     >>                                               return
     >> Optional.of(accDataType);
     >>                                           })
     >>                                   .outputTypeStrategy(
     >>                                           callContext -> {
     >>                                               DataType argDataType =
     >>                  callContext.getArgumentDataTypes().get(0);
     >>                                               DataType outputDataType =
     >>                                                       DataTypes.ROW(
     >>
     >> DataTypes.FIELD("value",
     >>                  argDataType),
     >>
     >> DataTypes.FIELD("date",
     >>                  DataTypes.DATE()));
     >>                                               return
     >> Optional.of(outputDataType);
     >>                                           })
     >>                                   .build();
     >>                       }
     >>                  }
     >>
     >>                  Regards,
     >>                  Timo
     >>
     >>
     >>
     >>                  On 20.01.21 01:04, Dylan Forciea wrote:
     >>                  > I am attempting to create an aggregate UDF that
     >> takes a generic
     >>                  > parameter T, but for the life of me, I can’t seem
     >> to get it to work.
     >>                  >
     >>                  > The UDF I’m trying to implement takes two input
     >> arguments, a value that
     >>                  > is generic, and a date. It will choose the non-null
     >> value with the
     >>                  > latest associated date. I had originally done this
     >> with separate Top 1
     >>                  > queries connected with a left join, but the memory
     >> usage seems far
     >>                  > higher than doing this with a custom aggregate
     >> function.
     >>                  >
     >>                  > As a first attempt, I tried to use custom type
     >> inference to have it
     >>                  > validate that the first argument type is the output
     >> type and have a
     >>                  > single function, and also used DataTypes.STRUCTURE
     >> to try to define the
     >>                  > shape of my accumulator. However, that resulted in
     >> an exception like
     >>                  > this whenever I tried to use a non-string value as
     >> the first argument:
     >>                  >
     >>                  > [error] Caused by: java.lang.ClassCastException:
     >> java.lang.Long cannot
     >>                  > be cast to java.lang.String
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown
     >>
     >>                  > Source)
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
     >>
     >>                  >
     >>                  > [error]   at
     >> GroupAggsHandler$777.getAccumulators(Unknown Source)
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
     >>
     >>                  >
     >>                  > [error]   at
     >>                  >
     >> 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
     >>
     >>                  >
     >>                  > [error]   at
     >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
     >>                  >
     >>                  > [error]   at
     >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
     >>                  >
     >>                  > [error]   at java.lang.Thread.run(Thread.java:748)
     >>                  >
     >>                  > Figuring that I can’t do something of that sort, I
     >> tried to follow the
     >>                  > general approach in the Sum accumulator[1] in the
     >> Flink source code
     >>                  > where separate classes are derived from a base
     >> class, and each
     >>                  > advertises its accumulator shape, but ended up with
     >> the exact same stack
     >>                  > trace as above when I tried to create and use a
     >> function specifically
     >>                  > for a non-string type like Long.
     >>                  >
     >>                  > Is there something I’m missing as far as how this
     >> is supposed to be
     >>                  > done? Everything I try either results in a stack
     >> track like the above,
     >>                  > or type erasure issues when trying to get type
     >> information for the
     >>                  > accumulator. If I just copy the generic code
     >> multiple times and just
     >>                  > directly use Long or String rather than using
     >> subclassing, then it works
     >>                  > just fine. I appreciate any help I can get on this!
     >>                  >
     >>                  > Regards,
     >>                  >
     >>                  > Dylan Forciea
     >>                  >
     >>                  > [1]
     >>                  >
     >> 
https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
     >>
     >>                  >
     >>
     >>
     >>
     >>
     >>
     >



Reply via email to