Hi,
I think for now we cannot upgrade to Flink 2.0.
I understand that I am registering my own POJO serializer because the
default one is not able serialize lists.
Based on the documentation:
In a hierarchy of types, the closest factory will be chosen while
traversing upwards. However, a built-in factory has the highest precedence.
A factory also has higher precedence than Flink’s built-in types, therefore
you should know what you are doing.
Now as I have registered a custom POJO serializer for these types, it seems
to be still picking up the default one.
I suspect that it is not picking this configuration:
configuration.set(
PipelineOptions.SERIALIZATION_CONFIG,
List.of(
"A: {type: typeinfo, class: ATypeInfoFactory}",
"B: {type: typeinfo, class: BTypeInfoFactory}"));
Can you please check if this configuration is correct? Is there anyway
I can check via logs if right serializers are registered with my
custom POJOs.
Thanks
Sachin
On Thu, Feb 13, 2025 at 12:06 PM Zhanghao Chen <[email protected]>
wrote:
> What you are doing is registering two third-party classes as POJO types,
> and this is actually the default behavior of Flink even without the
> registration. And for POJO types, Flink will create a POJO serializer for
> serialization.
>
> A side note: since Flink 2.0, built-in serialization support for
> java.util.List is introduced, and you should not need any additional type
> registrations to disable the generic types in this case.
>
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* Sachin Mittal <[email protected]>
> *Sent:* Thursday, February 13, 2025 12:23
> *To:* Zhanghao Chen <[email protected]>
> *Cc:* user <[email protected]>
> *Subject:* Re: How to register pojo type information for third party pojo
> classes
>
> Hi,
> So I have two classes (third party pojos):
> --------------------------------------------------------------------
>
> public class A {
>
> private List<B> bList;
>
> ...
>
> }
>
> public class B {
>
> ...
>
> }
>
> --------------------------------------------------------------------
>
> I have defined my type info factories as:
> --------------------------------------------------------------------
>
> public class ATypeInfoFactory extends TypeInfoFactory<A> {
> @Override
> public TypeInformation<A> createTypeInfo(Type t, Map<String,
> TypeInformation<?>> genericParameters) {
> Map<String, TypeInformation<?>> fields =
> new HashMap<>() {
> {
>
> put("bList", Types.LIST(Types.POJO(B.class)));
> ...
> }
> };
> return Types.POJO(A.class, fields);
> }
> }
>
> public class BTypeInfoFactory extends TypeInfoFactory<B> {
> @Override
> public TypeInformation<B> createTypeInfo(Type t, Map<String,
> TypeInformation<?>> genericParameters) {
> Map<String, TypeInformation<?>> fields =
> new HashMap<>() {
> {
> ...
> }
> };
> return Types.POJO(B.class, fields);
> }
> }
>
> --------------------------------------------------------------------
>
> So I am setting this as:
> --------------------------------------------------------------------
>
> configuration.set(
> PipelineOptions.SERIALIZATION_CONFIG,
> List.of(
> "A: {type: typeinfo, class: ATypeInfoFactory}",
> "B: {type: typeinfo, class: BTypeInfoFactory}"));
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>
> also I have set:
>
> env.getConfig().disableGenericTypes();
>
> And I am using this in my state as:
>
> ValueStateDescriptor<A> aStateDescriptor = new
> ValueStateDescriptor<>("a.state", A.class);
>
> aState = getRuntimeContext().getState(aStateDescriptor);
>
>
> So now when I run my flink job I get this error:
>
>
> java.lang.UnsupportedOperationException: Generic types have been disabled
> in the ExecutionConfig and type java.util.List is treated as a generic
> type.
> at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(
> GenericTypeInfo.java:88)
> at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(
> PojoTypeInfo.java:355)
> at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(
> PojoTypeInfo.java:347)
> at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext
> .createSerializer(AbstractRuntimeUDFContext.java:101)
> at org.apache.flink.api.common.state.StateDescriptor
> .initializeSerializerUnlessSet(StateDescriptor.java:336)
> at org.apache.flink.streaming.api.operators.StreamingRuntimeContext
> .getState(StreamingRuntimeContext.java:202)
>
> So my question is if I have defined a serializer for A class, why is it
> still creating a pojo serializer ?
>
> Thanks
> Sachin
>
>
> On Wed, Feb 12, 2025 at 8:52 PM Zhanghao Chen <[email protected]>
> wrote:
>
> Hi, you may use the option "pipeline.serialization-config" [1] to register
> type info for any custom type, which is available since Flink 1.19.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-serialization-config
>
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* Sachin Mittal <[email protected]>
> *Sent:* Wednesday, February 12, 2025 20:20
> *To:* user <[email protected]>
> *Subject:* How to register pojo type information for third party pojo
> classes
>
> Hi,
> I have a Pojo class provided by some library.
> Say A.class
>
> I can create a type info factory of the same like:
>
> public class ATypeInfoFactory extends TypeInfoFactory<A> {
> @Override
> public TypeInformation<A> createTypeInfo(
> Type t, Map<String, TypeInformation<?>> genericParameters) {
> Map<String, TypeInformation<?>> fields =
> new HashMap<>() {
> {
> ...
> }
> };
> return Types.POJO(A.class, fields);
> }
> }
>
> Now I want to register this type information whenever A class's object is
> serialized or de-serialized in Flink state.
>
> How can I register this to the StreamExecutionEnvironment.
>
> Thanks
> Sachin
>
>
>
>