Hi all
When I use udf, it throws Unable to serialize Exception as follows:

Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to serialize object 'UserTableFunction' of class 
‘....udtf.UserTableFunction'.
        at 
org.apache.flink.table.utils.EncodingUtils.encodeObjectToString(EncodingUtils.java:72)
        at 
org.apache.flink.table.functions.UserDefinedFunction.functionIdentifier(UserDefinedFunction.java:45)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.udfFieldName(CodeGenUtils.scala:715)
        at 
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:615)


My udf as follows.  
public class UserTableFunction extends TableFunction<Row> {
    private static final long serialVersionUID = 1L;
    private HikariCPUtils dbUtils = new HikariCPUtils();
    protected Connection connection;


    protected PreparedStatement preparedStatement = null;
    @Override
    public void open(FunctionContext context) throws Exception {
        connection = dbUtils.getConnection();
    }

    @Override
    public void close() throws Exception {
        if (null != connection)
            connection.close();
        if (null != preparedStatement)
            preparedStatement.close();
    }

    public void eval(long uid, int countryId) {
        ...
        Row row = new Row(8);
        try {
            ...
            collect(row);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return Types.ROW(Types.STRING, Types.STRING, Types.STRING, 
Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING);
    }
}


polaris...@gmail.com




Reply via email to