Hello,

Flink Version : 1.20.

I am just beginning to develop a flink job with following topology

Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination
Kafka Source 2 ^

The KeyedCoProcessFunction listens to two kafka topics, It also maintains
State with following variables. (psudo) code.

class MyClass extends KeyedCoProcessFunction {

private final MapState<Long, Pojo1> Pojo1Map;
    private final MapState<String, Pojo2> Pojo2Map;
    private final MapState<Long, Map<String, String>> 3rdStatemap;
    private final MapState<String, Long> 4thMap;
    private final MapState<String, Pojo3> pojo3Map;

public void open(Configuration parameters) throws Exception {
  //flink specific code to initialize above mapstates
}
}


This is my just beginning of development and there could be at most 5-10
kafka messages.

On my development box, I noticed that for almost 12-15 minutes the *flink
dashboard shows KeyedCoProcessFunction task in initializing status.* Until
then no kafka message is processed, After 12 minutes, it processes a new
kafka message and accumulated messages correctly.


I have checkpointing enabled with the following settings.
private void enableCheckpointing(StreamExecutionEnvironment env,
ParameterTool parameters) {
        // Checkpointing every 10 seconds
        env.enableCheckpointing(10000);

        //Advanced options
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        // Checkpoint storage location
        if (parameters.has(CHECKPOINT_PATH)) {
            String checkpointPath = parameters.get(CHECKPOINT_PATH);
            logger.info("Setting checkpoint path to: {}", checkpointPath);
            env.getCheckpointConfig().setCheckpointStorage(new
FileSystemCheckpointStorage(checkpointPath));
        } else {
            logger.warn("No checkpoint path provided. Using default
checkpointing.");
        }


checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.setMinPauseBetweenCheckpoints(1000);
        checkpointConfig.setCheckpointTimeout(120000);
        checkpointConfig.setMaxConcurrentCheckpoints(1);

checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.enableUnalignedCheckpoints();

        //fixed delay strategy
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                // number of restart attempts
                3,
                // delay
                org.apache.flink.api.common.time.Time.seconds(10)
        ));

    }

My question is.
What's happening? Am I doing something wrong? *Why is it taking so long to
initialize the job and in particular above function?* There is hardly any*
pre-existing state*, It also takes the same time even if I clear the file
based snapshot folder. Any direction would be appreciated.

Thank You,
Santosh

Reply via email to