Hi JingsongLee and Timo,

Thanks for taking a look and for the feedback!

All the best,
Morrisa


Morrisa Brenner
Software Engineer
━━
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com/>




> On May 21, 2019, at 12:10 AM, JingsongLee <lzljs3620...@aliyun.com> wrote:
> 
> Hi Morrisa:
> 
> It seems that flink planner not support return Object(or generic, like you 
> say, type erasure) in ScalarFunction.
> In ScalarFunctionCallGen:
> val functionCallCode =
>   s"""
>     |${parameters.map(_.code).mkString("\n")}
>     |$resultTypeTerm $resultTerm = $functionReference.eval(
>     |  ${parameters.map(_.resultTerm).mkString(", ")});
>     |""".stripMargin
> There should be a coercive transformation to eval return value to support 
> this situation.
> I have no ideas to bypass it. If you can modify the source code, you can 
> change it to this way to support generic return type:
> val functionCallCode =
>   s"""
>     |${parameters.map(_.code).mkString("\n")}
>     |$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
>     |  ${parameters.map(_.resultTerm).mkString(", ")});
>     |""".stripMargin
> 
> Best, JingsongLee
> 
> ------------------------------------------------------------------
> From:Timo Walther <twal...@apache.org>
> Send Time:2019年5月20日(星期一) 23:03
> To:user <user@flink.apache.org>
> Subject:Re: Generic return type on a user-defined scalar function
> 
> Hi Morrisa,
> 
> usually, this means that you class is not recognized as a POJO. Please check 
> again the requirements of a POJO: Default constructor, getters and setters 
> for every field etc. You can use 
> org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class 
> is a POJO or not.
> 
> I hope this helps.
> 
> Regards,
> Timo
> 
> 
> Am 16.05.19 um 23:18 schrieb Morrisa Brenner:
> Hi Flink folks,
> 
> In a Flink job using the SQL API that I’m working on, I have a custom POJO 
> data type with a generic field, and I would like to be able to call a 
> user-defined function on this field. I included a similar function below with 
> the business logic stubbed out, but this example has the return type I'm 
> looking for.
> 
> I have no issues using custom functions of this type when they're used in a 
> select statement and the `getResultType` method is excluded from the 
> user-defined function class, but I am unable to get the type information to 
> resolve correctly in contexts like order by and group by statements. It still 
> doesn't work even if the `getResultType` method defines the specific type for 
> a given object explicitly because the job compiler within Flink seems to be 
> assuming the return type from the `eval` method is just an Object (type 
> erasure...), and it fails to generate the object code because it's detecting 
> invalid casts to the desired output type. Without the `getResultType` method, 
> it just fails to detect type entirely. This seems to be fine when it's just a 
> select, but if I try to make it do any operation (like group by) I get the 
> following error: "org.apache.flink.api.common.InvalidProgramException: This 
> type (GenericType<java.lang.Object>) cannot be used as key."
> 
> Does anyone know if there's a way to get Flink to pay attention to the type 
> information from `getResultType` when compiling the `eval` method so that the 
> types work out? Or another way to work around the type erasure on the eval 
> method without defining explicit user-defined function classes for each type?
> 
> Thanks for your help!
> 
> Morrisa
> 
> 
> 
> Code snippet:
> 
> 
> package flink_generics_testing;
> 
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.table.functions.ScalarFunction;
> 
> /**
> * Reads custom values from a table and performs a function on those values.
> * T should be able to be a String, long, float, boolean, or Date
> *
> * @param <T> The expected type of the table column values.
> */
> public class CustomScalarFunction<T> extends ScalarFunction {
> 
>    private static final long serialVersionUID = -5537657771138360838L;
> 
>    private final Class<T> desiredType;
> 
>    /**
>     * Construct an instance.
>     *
>     * @param desiredType The type of the value that we're performing the 
> function on.
>     */
>    public CustomScalarFunction(Class<T> desiredType) {
>        this.desiredType = desiredType;
>    }
> 
>    public T eval(T value) {
>        return value;
>    }
> 
>    @Override
>    public TypeInformation<?> getResultType(Class<?>[] signature) {
>        return TypeInformation.of(desiredType);
>    }
> 
>    @Override
>    public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
>        return new TypeInformation<?>[]{
>                TypeInformation.of(desiredType)
>        };
>    }
> }
> 
> 
> -- 
>       Morrisa Brenner
> Software Engineer
> 
> 225 Franklin St, Boston, MA 02110
> klaviyo.com <https://www.klaviyo.com/>
> 
> 

Reply via email to