Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Dylan Forciea
I wanted to report that I tried out your PR, and it does solve my issue. I am able to create a generic LatestNonNull and it appears to do what is expected. Thanks, Dylan Forciea On 1/21/21, 8:50 AM, "Timo Walther" wrote: I opened a PR. Feel free to try it out. https://github.com/apac

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Timo Walther
Hi Dylan, I can help with a review for your PR tomorrow. In general, I would recommend to just ping people a couple of times that have been worked on the component before (see git blame) to get a review. We are all busy and need a bit of pushing from time to time ;-) Thanks, Timo On 21.01.2

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Dylan Forciea
Timo, Will do! I have been patching in a change locally that I have a PR [1] out for, so if this will end up in the next 1.12 patch release, I may add this in with it once it has been approved and merged. On a side note, that PR has been out since the end of October (looks like I need to do a

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Timo Walther
I opened a PR. Feel free to try it out. https://github.com/apache/flink/pull/14720 Btw: >> env.createTemporarySystemFunction("LatestNonNullLong", >> classOf[LatestNonNull[Long]]) >> >> env.createTemporarySystemFunction("LatestNonNullString", >> classOf[LatestNonNull[String]]) don't make a diff

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Timo Walther
Hi Dylan, thanks for the investigation. I can now also reproduce it my code. Yes, this is a bug. I opened https://issues.apache.org/jira/browse/FLINK-21070 and will try to fix this asap. Thanks, Timo On 20.01.21 17:52, Dylan Forciea wrote: Timo, I converted what I had to Java, and ended u

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Timo, I converted what I had to Java, and ended up with the exact same issue as before where it will work if I only ever use it on 1 type, but not if I use it on multiple. Maybe this is a bug? Dylan On 1/20/21, 10:06 AM, "Dylan Forciea" wrote: Oh, I think I might have a clue as to what

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Oh, I think I might have a clue as to what is going on. I notice that it will work properly when I only call it on Long. I think that it is using the same generated code for the Converter for whatever was called first. Since in Scala I can't declare an object as static within the class itself, I

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
As a side note, I also just tried to unify into a single function registration and used _ as the type parameter in the classOf calls there and within the TypeInference definition for the accumulator and still ended up with the exact same stack trace. Dylan On 1/20/21, 9:22 AM, "Dylan Forciea"

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Timo, I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the v

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Timo Walther
Hi Dylan, I'm assuming your are using Flink 1.12 and the Blink planner? Beginning from 1.12 you can use the "new" aggregate functions with a better type inference. So TypeInformation will not be used in this stack. I tried to come up with an example that should explain the rough design. I wi

Trying to create a generic aggregate UDF

2021-01-19 Thread Dylan Forciea
I am attempting to create an aggregate UDF that takes a generic parameter T, but for the life of me, I can’t seem to get it to work. The UDF I’m trying to implement takes two input arguments, a value that is generic, and a date. It will choose the non-null value with the latest associated date.