Hello, Flink Version : 1.20.
I am just beginning to develop a flink job with following topology Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination Kafka Source 2 ^ The KeyedCoProcessFunction listens to two kafka topics, It also maintains State with following variables. (psudo) code. class MyClass extends KeyedCoProcessFunction { private final MapState<Long, Pojo1> Pojo1Map; private final MapState<String, Pojo2> Pojo2Map; private final MapState<Long, Map<String, String>> 3rdStatemap; private final MapState<String, Long> 4thMap; private final MapState<String, Pojo3> pojo3Map; public void open(Configuration parameters) throws Exception { //flink specific code to initialize above mapstates } } This is my just beginning of development and there could be at most 5-10 kafka messages. On my development box, I noticed that for almost 12-15 minutes the *flink dashboard shows KeyedCoProcessFunction task in initializing status.* Until then no kafka message is processed, After 12 minutes, it processes a new kafka message and accumulated messages correctly. I have checkpointing enabled with the following settings. private void enableCheckpointing(StreamExecutionEnvironment env, ParameterTool parameters) { // Checkpointing every 10 seconds env.enableCheckpointing(10000); //Advanced options CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // Checkpoint storage location if (parameters.has(CHECKPOINT_PATH)) { String checkpointPath = parameters.get(CHECKPOINT_PATH); logger.info("Setting checkpoint path to: {}", checkpointPath); env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath)); } else { logger.warn("No checkpoint path provided. Using default checkpointing."); } checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.setMinPauseBetweenCheckpoints(1000); checkpointConfig.setCheckpointTimeout(120000); checkpointConfig.setMaxConcurrentCheckpoints(1); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.enableUnalignedCheckpoints(); //fixed delay strategy env.setRestartStrategy(RestartStrategies.fixedDelayRestart( // number of restart attempts 3, // delay org.apache.flink.api.common.time.Time.seconds(10) )); } My question is. What's happening? Am I doing something wrong? *Why is it taking so long to initialize the job and in particular above function?* There is hardly any* pre-existing state*, It also takes the same time even if I clear the file based snapshot folder. Any direction would be appreciated. Thank You, Santosh