Hi,

The checkpoint is hardly anything, a few 10s KBs max. Disk is also Ok, On
further investigation, I suspect is the way exactly once is determined with
kafka, I see that kafka broker CPU % is very high during flink start up
with numerous following logs in kafka broker,

2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO [TransactionCoordinator
id=1] Initialized transactionalId <my-topic1>-2-88 with producerId 16835
and producer epoch 162 on partition __transaction_state-0
(kafka.coordinator.transaction.TransactionCoordinator)
2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO [TransactionCoordinator
id=1] Initialized transactionalId <my-topic2>-1335-21 with producerId
1287059 and producer epoch 0 on partition __transaction_state-3
(kafka.coordinator.transaction.TransactionCoordinator)
2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO [TransactionCoordinator
id=1] Initialized transactionalId <my-topic1>-0-1246 with producerId 74060
and producer epoch 129 on partition __transaction_state-47
(kafka.coordinator.transaction.TransactionCoordinator)
2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO [TransactionCoordinator
id=1] Initialized transactionalId <my-topic2>-1-26 with producerId 8575 and
producer epoch 186 on partition __

Only When the above logs stop appearing in kafka broker logs, that I see
that the function is able to accept kafka messages for processing.

My flink kafka consumer and sink are as follows.

public static <T> KafkaSource<T> createKafkaSource(KafkaPropertiesReader
reader, DeserializationSchema<T> deserializationSchema) {
        Properties consumerProperties = new Properties();


        return KafkaSource.<T>builder()
                .setBootstrapServers(reader.getBootStrapServers())
                .setTopics(reader.getTopic())
                .setGroupId(reader.getGroupId())

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// Start from committed or use checkpoints
                //.setStartingOffsets(OffsetsInitializer.latest())
                .setProperties(consumerProperties)

.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
                        "true")
                .setValueOnlyDeserializer(deserializationSchema)

                .build();
    }

    public static <T> KafkaSink<T> createKafkaSink(KafkaOutputReader
reader, KafkaRecordSerializationSchema<T> serializationSchema) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
reader.getBootStrapServers());
        properties.setProperty("transaction.timeout.ms",
reader.getTransactionTimeoutMs());
        properties.setProperty("retries", reader.getRetries());
        properties.setProperty("linger.ms", reader.getLingerMs());
        properties.setProperty("batch.size", reader.getBatchSize());
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("isolation.level", "read_committed"); // For
transactional reads


        return KafkaSink.<T>builder()
                .setBootstrapServers(reader.getBootStrapServers())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix(reader.getTransactionalIdPrefix())
                .setKafkaProducerConfig(properties)
                .setRecordSerializer(serializationSchema)
                .build();
    }


Am I missing anything while initializing kafka?


On Mon, Apr 7, 2025 at 9:59 AM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Santosh,
>
> May I ask which state backend you are using? And how huge is the
> checkpoint? You may check the disk usage of the configured checkpoint path.
>
>
> Best,
> Zakelly
>
> On Sat, Apr 5, 2025 at 11:58 PM santosh techie <santosh.tec...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Flink Version : 1.20.
>>
>> I am just beginning to develop a flink job with following topology
>>
>> Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination
>> Kafka Source 2 ^
>>
>> The KeyedCoProcessFunction listens to two kafka topics, It also maintains
>> State with following variables. (psudo) code.
>>
>> class MyClass extends KeyedCoProcessFunction {
>>
>> private final MapState<Long, Pojo1> Pojo1Map;
>>     private final MapState<String, Pojo2> Pojo2Map;
>>     private final MapState<Long, Map<String, String>> 3rdStatemap;
>>     private final MapState<String, Long> 4thMap;
>>     private final MapState<String, Pojo3> pojo3Map;
>>
>> public void open(Configuration parameters) throws Exception {
>>   //flink specific code to initialize above mapstates
>> }
>> }
>>
>>
>> This is my just beginning of development and there could be at most 5-10
>> kafka messages.
>>
>> On my development box, I noticed that for almost 12-15 minutes the *flink
>> dashboard shows KeyedCoProcessFunction task in initializing status.*
>> Until then no kafka message is processed, After 12 minutes, it processes a
>> new kafka message and accumulated messages correctly.
>>
>>
>> I have checkpointing enabled with the following settings.
>> private void enableCheckpointing(StreamExecutionEnvironment env,
>> ParameterTool parameters) {
>>         // Checkpointing every 10 seconds
>>         env.enableCheckpointing(10000);
>>
>>         //Advanced options
>>         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>>         // Checkpoint storage location
>>         if (parameters.has(CHECKPOINT_PATH)) {
>>             String checkpointPath = parameters.get(CHECKPOINT_PATH);
>>             logger.info("Setting checkpoint path to: {}",
>> checkpointPath);
>>             env.getCheckpointConfig().setCheckpointStorage(new
>> FileSystemCheckpointStorage(checkpointPath));
>>         } else {
>>             logger.warn("No checkpoint path provided. Using default
>> checkpointing.");
>>         }
>>
>>
>> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>         checkpointConfig.setMinPauseBetweenCheckpoints(1000);
>>         checkpointConfig.setCheckpointTimeout(120000);
>>         checkpointConfig.setMaxConcurrentCheckpoints(1);
>>
>> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>         checkpointConfig.enableUnalignedCheckpoints();
>>
>>         //fixed delay strategy
>>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
>>                 // number of restart attempts
>>                 3,
>>                 // delay
>>                 org.apache.flink.api.common.time.Time.seconds(10)
>>         ));
>>
>>     }
>>
>> My question is.
>> What's happening? Am I doing something wrong? *Why is it taking so long
>> to initialize the job and in particular above function?* There is hardly
>> any* pre-existing state*, It also takes the same time even if I clear
>> the file based snapshot folder. Any direction would be appreciated.
>>
>> Thank You,
>> Santosh
>>
>

Reply via email to