Hi, The checkpoint is hardly anything, a few 10s KBs max. Disk is also Ok, On further investigation, I suspect is the way exactly once is determined with kafka, I see that kafka broker CPU % is very high during flink start up with numerous following logs in kafka broker,
2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO [TransactionCoordinator id=1] Initialized transactionalId <my-topic1>-2-88 with producerId 16835 and producer epoch 162 on partition __transaction_state-0 (kafka.coordinator.transaction.TransactionCoordinator) 2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO [TransactionCoordinator id=1] Initialized transactionalId <my-topic2>-1335-21 with producerId 1287059 and producer epoch 0 on partition __transaction_state-3 (kafka.coordinator.transaction.TransactionCoordinator) 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO [TransactionCoordinator id=1] Initialized transactionalId <my-topic1>-0-1246 with producerId 74060 and producer epoch 129 on partition __transaction_state-47 (kafka.coordinator.transaction.TransactionCoordinator) 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO [TransactionCoordinator id=1] Initialized transactionalId <my-topic2>-1-26 with producerId 8575 and producer epoch 186 on partition __ Only When the above logs stop appearing in kafka broker logs, that I see that the function is able to accept kafka messages for processing. My flink kafka consumer and sink are as follows. public static <T> KafkaSource<T> createKafkaSource(KafkaPropertiesReader reader, DeserializationSchema<T> deserializationSchema) { Properties consumerProperties = new Properties(); return KafkaSource.<T>builder() .setBootstrapServers(reader.getBootStrapServers()) .setTopics(reader.getTopic()) .setGroupId(reader.getGroupId()) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) // Start from committed or use checkpoints //.setStartingOffsets(OffsetsInitializer.latest()) .setProperties(consumerProperties) .setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "true") .setValueOnlyDeserializer(deserializationSchema) .build(); } public static <T> KafkaSink<T> createKafkaSink(KafkaOutputReader reader, KafkaRecordSerializationSchema<T> serializationSchema) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", reader.getBootStrapServers()); properties.setProperty("transaction.timeout.ms", reader.getTransactionTimeoutMs()); properties.setProperty("retries", reader.getRetries()); properties.setProperty("linger.ms", reader.getLingerMs()); properties.setProperty("batch.size", reader.getBatchSize()); properties.setProperty("enable.auto.commit", "false"); properties.setProperty("isolation.level", "read_committed"); // For transactional reads return KafkaSink.<T>builder() .setBootstrapServers(reader.getBootStrapServers()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix(reader.getTransactionalIdPrefix()) .setKafkaProducerConfig(properties) .setRecordSerializer(serializationSchema) .build(); } Am I missing anything while initializing kafka? On Mon, Apr 7, 2025 at 9:59 AM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Santosh, > > May I ask which state backend you are using? And how huge is the > checkpoint? You may check the disk usage of the configured checkpoint path. > > > Best, > Zakelly > > On Sat, Apr 5, 2025 at 11:58 PM santosh techie <santosh.tec...@gmail.com> > wrote: > >> Hello, >> >> Flink Version : 1.20. >> >> I am just beginning to develop a flink job with following topology >> >> Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination >> Kafka Source 2 ^ >> >> The KeyedCoProcessFunction listens to two kafka topics, It also maintains >> State with following variables. (psudo) code. >> >> class MyClass extends KeyedCoProcessFunction { >> >> private final MapState<Long, Pojo1> Pojo1Map; >> private final MapState<String, Pojo2> Pojo2Map; >> private final MapState<Long, Map<String, String>> 3rdStatemap; >> private final MapState<String, Long> 4thMap; >> private final MapState<String, Pojo3> pojo3Map; >> >> public void open(Configuration parameters) throws Exception { >> //flink specific code to initialize above mapstates >> } >> } >> >> >> This is my just beginning of development and there could be at most 5-10 >> kafka messages. >> >> On my development box, I noticed that for almost 12-15 minutes the *flink >> dashboard shows KeyedCoProcessFunction task in initializing status.* >> Until then no kafka message is processed, After 12 minutes, it processes a >> new kafka message and accumulated messages correctly. >> >> >> I have checkpointing enabled with the following settings. >> private void enableCheckpointing(StreamExecutionEnvironment env, >> ParameterTool parameters) { >> // Checkpointing every 10 seconds >> env.enableCheckpointing(10000); >> >> //Advanced options >> CheckpointConfig checkpointConfig = env.getCheckpointConfig(); >> // Checkpoint storage location >> if (parameters.has(CHECKPOINT_PATH)) { >> String checkpointPath = parameters.get(CHECKPOINT_PATH); >> logger.info("Setting checkpoint path to: {}", >> checkpointPath); >> env.getCheckpointConfig().setCheckpointStorage(new >> FileSystemCheckpointStorage(checkpointPath)); >> } else { >> logger.warn("No checkpoint path provided. Using default >> checkpointing."); >> } >> >> >> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> checkpointConfig.setMinPauseBetweenCheckpoints(1000); >> checkpointConfig.setCheckpointTimeout(120000); >> checkpointConfig.setMaxConcurrentCheckpoints(1); >> >> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> checkpointConfig.enableUnalignedCheckpoints(); >> >> //fixed delay strategy >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart( >> // number of restart attempts >> 3, >> // delay >> org.apache.flink.api.common.time.Time.seconds(10) >> )); >> >> } >> >> My question is. >> What's happening? Am I doing something wrong? *Why is it taking so long >> to initialize the job and in particular above function?* There is hardly >> any* pre-existing state*, It also takes the same time even if I clear >> the file based snapshot folder. Any direction would be appreciated. >> >> Thank You, >> Santosh >> >