Hi Morrisa,
usually, this means that you class is not recognized as a POJO. Please
check again the requirements of a POJO: Default constructor, getters and
setters for every field etc. You can use
org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your
class is a POJO or not.
I hope this helps.
Regards,
Timo
Am 16.05.19 um 23:18 schrieb Morrisa Brenner:
Hi Flink folks,
In a Flink job using the SQL API that I’m working on, I have a custom
POJO data type with a generic field, and I would like to be able to
call a user-defined function on this field.I included a similar
function below with the business logic stubbed out, but this example
has the return type I'm looking for.
I have no issues using custom functions of this type when they're used
in a select statement and the `getResultType` method is excluded from
the user-defined function class, but I am unable to get the type
information to resolve correctly in contexts like order by and group
by statements. It still doesn't work even if the `getResultType`
method defines the specific type for a given object explicitly because
the job compiler within Flink seems to be assuming the return type
from the `eval` method is just an Object (type erasure...), and it
fails to generate the object code because it's detecting invalid casts
to the desired output type. Without the `getResultType` method, it
just fails to detect type entirely. This seems to be fine when it's
just a select, but if I try to make it do any operation (like group
by) I get the following error:
"org.apache.flink.api.common.InvalidProgramException: This type
(GenericType<java.lang.Object>) cannot be used as key."
Does anyone know if there's a way to get Flink to pay attention to the
type information from `getResultType` when compiling the `eval` method
so that the types work out? Or another way to work around the type
erasure on the eval method without defining explicit user-defined
function classes for each type?
Thanks for your help!
Morrisa
Code snippet:
package flink_generics_testing;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.ScalarFunction;
/**
* Reads custom values from a table and performs a function on those
values.
* T should be able to be a String, long, float, boolean, or Date
*
* @param <T> The expected type of the table column values.
*/
public class CustomScalarFunction<T> extends ScalarFunction {
private static final long serialVersionUID = -5537657771138360838L;
private final Class<T> desiredType;
/**
* Construct an instance.
*
* @param desiredType The type of the value that we're performing
the function on.
*/
public CustomScalarFunction(Class<T> desiredType) {
this.desiredType = desiredType;
}
public T eval(T value) {
return value;
}
@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return TypeInformation.of(desiredType);
}
@Override
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
return new TypeInformation<?>[]{
TypeInformation.of(desiredType)
};
}
}
--
Morrisa Brenner
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com>
Klaviyo Logo