Hi,
So I have two classes (third party pojos):
--------------------------------------------------------------------
public class A {
private List<B> bList;
...
}
public class B {
...
}
--------------------------------------------------------------------
I have defined my type info factories as:
--------------------------------------------------------------------
public class ATypeInfoFactory extends TypeInfoFactory<A> {
@Override
public TypeInformation<A> createTypeInfo(Type t, Map<String,
TypeInformation<?>> genericParameters) {
Map<String, TypeInformation<?>> fields =
new HashMap<>() {
{
put("bList", Types.LIST(Types.POJO(B.class)));
...
}
};
return Types.POJO(A.class, fields);
}
}
public class BTypeInfoFactory extends TypeInfoFactory<B> {
@Override
public TypeInformation<B> createTypeInfo(Type t, Map<String,
TypeInformation<?>> genericParameters) {
Map<String, TypeInformation<?>> fields =
new HashMap<>() {
{
...
}
};
return Types.POJO(B.class, fields);
}
}
--------------------------------------------------------------------
So I am setting this as:
--------------------------------------------------------------------
configuration.set(
PipelineOptions.SERIALIZATION_CONFIG,
List.of(
"A: {type: typeinfo, class: ATypeInfoFactory}",
"B: {type: typeinfo, class: BTypeInfoFactory}"));
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
also I have set:
env.getConfig().disableGenericTypes();
And I am using this in my state as:
ValueStateDescriptor<A> aStateDescriptor = new
ValueStateDescriptor<>("a.state", A.class);
aState = getRuntimeContext().getState(aStateDescriptor);
So now when I run my flink job I get this error:
java.lang.UnsupportedOperationException: Generic types have been disabled in
the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(
GenericTypeInfo.java:88)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(
PojoTypeInfo.java:355)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(
PojoTypeInfo.java:347)
at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext
.createSerializer(AbstractRuntimeUDFContext.java:101)
at org.apache.flink.api.common.state.StateDescriptor
.initializeSerializerUnlessSet(StateDescriptor.java:336)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext
.getState(StreamingRuntimeContext.java:202)
So my question is if I have defined a serializer for A class, why is it
still creating a pojo serializer ?
Thanks
Sachin
On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <[email protected]>
wrote:
> Hi, you may use the option "pipeline.serialization-config" [1] to register
> type info for any custom type, which is available since Flink 1.19.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config
>
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* Sachin Mittal <[email protected]>
> *Sent:* Wednesday, February 12, 2025 20:20
> *To:* user <[email protected]>
> *Subject:* How to register pojo type information for third party pojo
> classes
>
> Hi,
> I have a Pojo class provided by some library.
> Say A.class
>
> I can create a type info factory of the same like:
>
> public class ATypeInfoFactory extends TypeInfoFactory<A> {
> @Override
> public TypeInformation<A> createTypeInfo(
> Type t, Map<String, TypeInformation<?>> genericParameters) {
> Map<String, TypeInformation<?>> fields =
> new HashMap<>() {
> {
> ...
> }
> };
> return Types.POJO(A.class, fields);
> }
> }
>
> Now I want to register this type information whenever A class's object is
> serialized or de-serialized in Flink state.
>
> How can I register this to the StreamExecutionEnvironment.
>
> Thanks
> Sachin
>
>
>
>