i'm currently using protobufs, and registering the serializers using kryo
protobuf using the following snippet of code:

    static void optionalRegisterProtobufSerializer(ExecutionConfig config,
Class<?> clazz) {

        if (clazz != null) {

            config.registerTypeWithKryoSerializer(clazz, ProtobufSerializer.
class);

        }

    }


    static void configureExecutionConfig(ExecutionConfig config) {

        optionalRegisterProtobufSerializer(config, User.class);

        optionalRegisterProtobufSerializer(config, View.class);

        optionalRegisterProtobufSerializer(config, Request.class);

        optionalRegisterProtobufSerializer(config, Insertion.class);

        optionalRegisterProtobufSerializer(config, Impression.class);

        optionalRegisterProtobufSerializer(config, Action.class);

        optionalRegisterProtobufSerializer(config, FlatEvent.class);

        optionalRegisterProtobufSerializer(config, LatestImpression.class);

    }


    // *TODO* - reuse with batch.

    void configureStreamExecutionEnvironment(StreamExecutionEnvironment
env) {

        configureExecutionConfig(env.getConfig());

        if (checkpointInterval > 0) {

            env.enableCheckpointing(checkpointInterval);

        }

        env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);

        // *TODO* - evaluate if we want setMinPauseBetweenCheckpoints.

        if (minPauseBetweenCheckpoints > 0) {


env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);

        }

        if (unalignedCheckpoints) {

            env.getCheckpointConfig().enableUnalignedCheckpoints();

        }

        if (checkpointTimeout > 0) {


env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout);

        }


env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    }

the concerning thing i have a question on is that i'm seeing these sorts of
info logs in the taskmanager logs:

org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class
class ai.promoted.proto.event.FlatEvent cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor            [] - class
ai.promoted.proto.event.LatestImpression does not contain a getter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor            [] - class
ai.promoted.proto.event.LatestImpression does not contain a setter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class
class ai.promoted.proto.event.LatestImpression cannot be used as a POJO
type because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor            [] - class
ai.promoted.proto.event.LatestImpression does not contain a getter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor            [] - class
ai.promoted.proto.event.LatestImpression does not contain a setter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class
class ai.promoted.proto.event.LatestImpression cannot be used as a POJO
type because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

can i safely ignore these?  is it telling me that it's doing the right
thing since kryo should kick in for GenericType?

Reply via email to