Hi JingsongLee and Timo, Thanks for taking a look and for the feedback!
All the best, Morrisa Morrisa Brenner Software Engineer ━━ 225 Franklin St, Boston, MA 02110 klaviyo.com <https://www.klaviyo.com/> > On May 21, 2019, at 12:10 AM, JingsongLee <lzljs3620...@aliyun.com> wrote: > > Hi Morrisa: > > It seems that flink planner not support return Object(or generic, like you > say, type erasure) in ScalarFunction. > In ScalarFunctionCallGen: > val functionCallCode = > s""" > |${parameters.map(_.code).mkString("\n")} > |$resultTypeTerm $resultTerm = $functionReference.eval( > | ${parameters.map(_.resultTerm).mkString(", ")}); > |""".stripMargin > There should be a coercive transformation to eval return value to support > this situation. > I have no ideas to bypass it. If you can modify the source code, you can > change it to this way to support generic return type: > val functionCallCode = > s""" > |${parameters.map(_.code).mkString("\n")} > |$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval( > | ${parameters.map(_.resultTerm).mkString(", ")}); > |""".stripMargin > > Best, JingsongLee > > ------------------------------------------------------------------ > From:Timo Walther <twal...@apache.org> > Send Time:2019年5月20日(星期一) 23:03 > To:user <user@flink.apache.org> > Subject:Re: Generic return type on a user-defined scalar function > > Hi Morrisa, > > usually, this means that you class is not recognized as a POJO. Please check > again the requirements of a POJO: Default constructor, getters and setters > for every field etc. You can use > org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class > is a POJO or not. > > I hope this helps. > > Regards, > Timo > > > Am 16.05.19 um 23:18 schrieb Morrisa Brenner: > Hi Flink folks, > > In a Flink job using the SQL API that I’m working on, I have a custom POJO > data type with a generic field, and I would like to be able to call a > user-defined function on this field. I included a similar function below with > the business logic stubbed out, but this example has the return type I'm > looking for. > > I have no issues using custom functions of this type when they're used in a > select statement and the `getResultType` method is excluded from the > user-defined function class, but I am unable to get the type information to > resolve correctly in contexts like order by and group by statements. It still > doesn't work even if the `getResultType` method defines the specific type for > a given object explicitly because the job compiler within Flink seems to be > assuming the return type from the `eval` method is just an Object (type > erasure...), and it fails to generate the object code because it's detecting > invalid casts to the desired output type. Without the `getResultType` method, > it just fails to detect type entirely. This seems to be fine when it's just a > select, but if I try to make it do any operation (like group by) I get the > following error: "org.apache.flink.api.common.InvalidProgramException: This > type (GenericType<java.lang.Object>) cannot be used as key." > > Does anyone know if there's a way to get Flink to pay attention to the type > information from `getResultType` when compiling the `eval` method so that the > types work out? Or another way to work around the type erasure on the eval > method without defining explicit user-defined function classes for each type? > > Thanks for your help! > > Morrisa > > > > Code snippet: > > > package flink_generics_testing; > > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.table.functions.ScalarFunction; > > /** > * Reads custom values from a table and performs a function on those values. > * T should be able to be a String, long, float, boolean, or Date > * > * @param <T> The expected type of the table column values. > */ > public class CustomScalarFunction<T> extends ScalarFunction { > > private static final long serialVersionUID = -5537657771138360838L; > > private final Class<T> desiredType; > > /** > * Construct an instance. > * > * @param desiredType The type of the value that we're performing the > function on. > */ > public CustomScalarFunction(Class<T> desiredType) { > this.desiredType = desiredType; > } > > public T eval(T value) { > return value; > } > > @Override > public TypeInformation<?> getResultType(Class<?>[] signature) { > return TypeInformation.of(desiredType); > } > > @Override > public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { > return new TypeInformation<?>[]{ > TypeInformation.of(desiredType) > }; > } > } > > > -- > Morrisa Brenner > Software Engineer > > 225 Franklin St, Boston, MA 02110 > klaviyo.com <https://www.klaviyo.com/> > >