Hi Santosh, May I ask which state backend you are using? And how huge is the checkpoint? You may check the disk usage of the configured checkpoint path.
Best, Zakelly On Sat, Apr 5, 2025 at 11:58 PM santosh techie <santosh.tec...@gmail.com> wrote: > Hello, > > Flink Version : 1.20. > > I am just beginning to develop a flink job with following topology > > Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination > Kafka Source 2 ^ > > The KeyedCoProcessFunction listens to two kafka topics, It also maintains > State with following variables. (psudo) code. > > class MyClass extends KeyedCoProcessFunction { > > private final MapState<Long, Pojo1> Pojo1Map; > private final MapState<String, Pojo2> Pojo2Map; > private final MapState<Long, Map<String, String>> 3rdStatemap; > private final MapState<String, Long> 4thMap; > private final MapState<String, Pojo3> pojo3Map; > > public void open(Configuration parameters) throws Exception { > //flink specific code to initialize above mapstates > } > } > > > This is my just beginning of development and there could be at most 5-10 > kafka messages. > > On my development box, I noticed that for almost 12-15 minutes the *flink > dashboard shows KeyedCoProcessFunction task in initializing status.* > Until then no kafka message is processed, After 12 minutes, it processes a > new kafka message and accumulated messages correctly. > > > I have checkpointing enabled with the following settings. > private void enableCheckpointing(StreamExecutionEnvironment env, > ParameterTool parameters) { > // Checkpointing every 10 seconds > env.enableCheckpointing(10000); > > //Advanced options > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > // Checkpoint storage location > if (parameters.has(CHECKPOINT_PATH)) { > String checkpointPath = parameters.get(CHECKPOINT_PATH); > logger.info("Setting checkpoint path to: {}", checkpointPath); > env.getCheckpointConfig().setCheckpointStorage(new > FileSystemCheckpointStorage(checkpointPath)); > } else { > logger.warn("No checkpoint path provided. Using default > checkpointing."); > } > > > checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > checkpointConfig.setMinPauseBetweenCheckpoints(1000); > checkpointConfig.setCheckpointTimeout(120000); > checkpointConfig.setMaxConcurrentCheckpoints(1); > > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > checkpointConfig.enableUnalignedCheckpoints(); > > //fixed delay strategy > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( > // number of restart attempts > 3, > // delay > org.apache.flink.api.common.time.Time.seconds(10) > )); > > } > > My question is. > What's happening? Am I doing something wrong? *Why is it taking so long > to initialize the job and in particular above function?* There is hardly > any* pre-existing state*, It also takes the same time even if I clear the > file based snapshot folder. Any direction would be appreciated. > > Thank You, > Santosh >