Hi Morrisa:

It seems that flink planner not support return Object(or generic, like you say, 
type erasure) in ScalarFunction.
In ScalarFunctionCallGen:
val functionCallCode =
  s"""
    |${parameters.map(_.code).mkString("\n")}
    |$resultTypeTerm $resultTerm = $functionReference.eval(
    |  ${parameters.map(_.resultTerm).mkString(", ")});
    |""".stripMargin
There should be a coercive transformation to eval return value to support this 
situation.
I have no ideas to bypass it. If you can modify the source code, you can change 
it to this way to support generic return type:
val functionCallCode =
  s"""
    |${parameters.map(_.code).mkString("\n")}
    |$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
    |  ${parameters.map(_.resultTerm).mkString(", ")});
    |""".stripMargin

Best, JingsongLee


------------------------------------------------------------------
From:Timo Walther <twal...@apache.org>
Send Time:2019年5月20日(星期一) 23:03
To:user <user@flink.apache.org>
Subject:Re: Generic return type on a user-defined scalar function

 
Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please check 
again the requirements of a POJO: Default constructor, getters and setters for 
every field etc. You can use 
org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is 
a POJO or not.

I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:


Hi Flink folks, 
In a Flink job using the SQL API that I’m working on, I have a custom POJO data 
type with a generic field, and I would like to be able to call a user-defined 
function on this field. I included a similar function below with the business 
logic stubbed out, but this example has the return type I'm looking for. 
I have no issues using custom functions of this type when they're used in a 
select statement and the `getResultType` method is excluded from the 
user-defined function class, but I am unable to get the type information to 
resolve correctly in contexts like order by and group by statements. It still 
doesn't work even if the `getResultType` method defines the specific type for a 
given object explicitly because the job compiler within Flink seems to be 
assuming the return type from the `eval` method is just an Object (type 
erasure...), and it fails to generate the object code because it's detecting 
invalid casts to the desired output type. Without the `getResultType` method, 
it just fails to detect type entirely. This seems to be fine when it's just a 
select, but if I try to make it do any operation (like group by) I get the 
following error: "org.apache.flink.api.common.InvalidProgramException: This 
type (GenericType<java.lang.Object>) cannot be used as key." 
Does anyone know if there's a way to get Flink to pay attention to the type 
information from `getResultType` when compiling the `eval` method so that the 
types work out? Or another way to work around the type erasure on the eval 
method without defining explicit user-defined function classes for each type? 
Thanks for your help! 
Morrisa 


Code snippet: 

package flink_generics_testing; 
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.ScalarFunction; 
/**
* Reads custom values from a table and performs a function on those values.
* T should be able to be a String, long, float, boolean, or Date
*
* @param <T> The expected type of the table column values.
*/
public class CustomScalarFunction<T> extends ScalarFunction { 
 private static final long serialVersionUID = -5537657771138360838L; 
   private final Class<T> desiredType; 
 /**
    * Construct an instance.
    *
    * @param desiredType The type of the value that we're performing the 
function on.
    */
 public CustomScalarFunction(Class<T> desiredType) {
 this.desiredType = desiredType;
 }
 
 public T eval(T value) {
 return value;
 } 
 @Override
 public TypeInformation<?> getResultType(Class<?>[] signature) {
 return TypeInformation.of(desiredType);
 } 
 @Override
 public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
 return new TypeInformation<?>[]{
               TypeInformation.of(desiredType)
       };
 }
} 

 -- 



                Morrisa Brenner
Software Engineer 


        
         225 Franklin St, Boston, MA 02110
klaviyo.com 
        [Klaviyo
                                  Logo]             

Reply via email to