i'm currently using protobufs, and registering the serializers using kryo protobuf using the following snippet of code:
static void optionalRegisterProtobufSerializer(ExecutionConfig config, Class<?> clazz) { if (clazz != null) { config.registerTypeWithKryoSerializer(clazz, ProtobufSerializer. class); } } static void configureExecutionConfig(ExecutionConfig config) { optionalRegisterProtobufSerializer(config, User.class); optionalRegisterProtobufSerializer(config, View.class); optionalRegisterProtobufSerializer(config, Request.class); optionalRegisterProtobufSerializer(config, Insertion.class); optionalRegisterProtobufSerializer(config, Impression.class); optionalRegisterProtobufSerializer(config, Action.class); optionalRegisterProtobufSerializer(config, FlatEvent.class); optionalRegisterProtobufSerializer(config, LatestImpression.class); } // *TODO* - reuse with batch. void configureStreamExecutionEnvironment(StreamExecutionEnvironment env) { configureExecutionConfig(env.getConfig()); if (checkpointInterval > 0) { env.enableCheckpointing(checkpointInterval); } env.getCheckpointConfig().setCheckpointingMode(checkpointingMode); // *TODO* - evaluate if we want setMinPauseBetweenCheckpoints. if (minPauseBetweenCheckpoints > 0) { env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints); } if (unalignedCheckpoints) { env.getCheckpointConfig().enableUnalignedCheckpoints(); } if (checkpointTimeout > 0) { env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout); } env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } the concerning thing i have a question on is that i'm seeing these sorts of info logs in the taskmanager logs: org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class ai.promoted.proto.event.FlatEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class ai.promoted.proto.event.LatestImpression does not contain a getter for field impressionId_ 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class ai.promoted.proto.event.LatestImpression does not contain a setter for field impressionId_ 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class ai.promoted.proto.event.LatestImpression does not contain a getter for field impressionId_ 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class ai.promoted.proto.event.LatestImpression does not contain a setter for field impressionId_ 2021-06-12 17:47:03,230 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class ai.promoted.proto.event.LatestImpression cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. can i safely ignore these? is it telling me that it's doing the right thing since kryo should kick in for GenericType?