Yes, It uses local FS. But as I said in my follow up email. I suspect it to
be my kafka source / sink settings.
Can anyone tell, for exactly once processing what should be ideal settings
for kafka source and sink?

Thank you

On Fri, Apr 11, 2025 at 5:22 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Are you on local FS from checkpoint perspective?
>
> G
>
>
> On Mon, Apr 7, 2025 at 7:05 PM santosh techie <santosh.tec...@gmail.com>
> wrote:
>
>> Hi,
>>
>> The checkpoint is hardly anything, a few 10s KBs max. Disk is also Ok, On
>> further investigation, I suspect is the way exactly once is determined with
>> kafka, I see that kafka broker CPU % is very high during flink start up
>> with numerous following logs in kafka broker,
>>
>> 2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO
>> [TransactionCoordinator id=1] Initialized transactionalId <my-topic1>-2-88
>> with producerId 16835 and producer epoch 162 on partition
>> __transaction_state-0 (kafka.coordinator.transaction.TransactionCoordinator)
>> 2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO
>> [TransactionCoordinator id=1] Initialized transactionalId
>> <my-topic2>-1335-21 with producerId 1287059 and producer epoch 0 on
>> partition __transaction_state-3
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO
>> [TransactionCoordinator id=1] Initialized transactionalId
>> <my-topic1>-0-1246 with producerId 74060 and producer epoch 129 on
>> partition __transaction_state-47
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO
>> [TransactionCoordinator id=1] Initialized transactionalId <my-topic2>-1-26
>> with producerId 8575 and producer epoch 186 on partition __
>>
>> Only When the above logs stop appearing in kafka broker logs, that I see
>> that the function is able to accept kafka messages for processing.
>>
>> My flink kafka consumer and sink are as follows.
>>
>> public static <T> KafkaSource<T> createKafkaSource(KafkaPropertiesReader
>> reader, DeserializationSchema<T> deserializationSchema) {
>>         Properties consumerProperties = new Properties();
>>
>>
>>         return KafkaSource.<T>builder()
>>                 .setBootstrapServers(reader.getBootStrapServers())
>>                 .setTopics(reader.getTopic())
>>                 .setGroupId(reader.getGroupId())
>>
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
>> // Start from committed or use checkpoints
>>                 //.setStartingOffsets(OffsetsInitializer.latest())
>>                 .setProperties(consumerProperties)
>>
>> .setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>>                         "true")
>>                 .setValueOnlyDeserializer(deserializationSchema)
>>
>>                 .build();
>>     }
>>
>>     public static <T> KafkaSink<T> createKafkaSink(KafkaOutputReader
>> reader, KafkaRecordSerializationSchema<T> serializationSchema) {
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers",
>> reader.getBootStrapServers());
>>         properties.setProperty("transaction.timeout.ms",
>> reader.getTransactionTimeoutMs());
>>         properties.setProperty("retries", reader.getRetries());
>>         properties.setProperty("linger.ms", reader.getLingerMs());
>>         properties.setProperty("batch.size", reader.getBatchSize());
>>         properties.setProperty("enable.auto.commit", "false");
>>         properties.setProperty("isolation.level", "read_committed"); //
>> For transactional reads
>>
>>
>>         return KafkaSink.<T>builder()
>>                 .setBootstrapServers(reader.getBootStrapServers())
>>                 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>>
>> .setTransactionalIdPrefix(reader.getTransactionalIdPrefix())
>>                 .setKafkaProducerConfig(properties)
>>                 .setRecordSerializer(serializationSchema)
>>                 .build();
>>     }
>>
>>
>> Am I missing anything while initializing kafka?
>>
>>
>> On Mon, Apr 7, 2025 at 9:59 AM Zakelly Lan <zakelly....@gmail.com> wrote:
>>
>>> Hi Santosh,
>>>
>>> May I ask which state backend you are using? And how huge is the
>>> checkpoint? You may check the disk usage of the configured checkpoint path.
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Sat, Apr 5, 2025 at 11:58 PM santosh techie <santosh.tec...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Flink Version : 1.20.
>>>>
>>>> I am just beginning to develop a flink job with following topology
>>>>
>>>> Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination
>>>> Kafka Source 2 ^
>>>>
>>>> The KeyedCoProcessFunction listens to two kafka topics, It also
>>>> maintains State with following variables. (psudo) code.
>>>>
>>>> class MyClass extends KeyedCoProcessFunction {
>>>>
>>>> private final MapState<Long, Pojo1> Pojo1Map;
>>>>     private final MapState<String, Pojo2> Pojo2Map;
>>>>     private final MapState<Long, Map<String, String>> 3rdStatemap;
>>>>     private final MapState<String, Long> 4thMap;
>>>>     private final MapState<String, Pojo3> pojo3Map;
>>>>
>>>> public void open(Configuration parameters) throws Exception {
>>>>   //flink specific code to initialize above mapstates
>>>> }
>>>> }
>>>>
>>>>
>>>> This is my just beginning of development and there could be at most
>>>> 5-10 kafka messages.
>>>>
>>>> On my development box, I noticed that for almost 12-15 minutes the *flink
>>>> dashboard shows KeyedCoProcessFunction task in initializing status.*
>>>> Until then no kafka message is processed, After 12 minutes, it processes a
>>>> new kafka message and accumulated messages correctly.
>>>>
>>>>
>>>> I have checkpointing enabled with the following settings.
>>>> private void enableCheckpointing(StreamExecutionEnvironment env,
>>>> ParameterTool parameters) {
>>>>         // Checkpointing every 10 seconds
>>>>         env.enableCheckpointing(10000);
>>>>
>>>>         //Advanced options
>>>>         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>>>>         // Checkpoint storage location
>>>>         if (parameters.has(CHECKPOINT_PATH)) {
>>>>             String checkpointPath = parameters.get(CHECKPOINT_PATH);
>>>>             logger.info("Setting checkpoint path to: {}",
>>>> checkpointPath);
>>>>             env.getCheckpointConfig().setCheckpointStorage(new
>>>> FileSystemCheckpointStorage(checkpointPath));
>>>>         } else {
>>>>             logger.warn("No checkpoint path provided. Using default
>>>> checkpointing.");
>>>>         }
>>>>
>>>>
>>>> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>         checkpointConfig.setMinPauseBetweenCheckpoints(1000);
>>>>         checkpointConfig.setCheckpointTimeout(120000);
>>>>         checkpointConfig.setMaxConcurrentCheckpoints(1);
>>>>
>>>> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>>         checkpointConfig.enableUnalignedCheckpoints();
>>>>
>>>>         //fixed delay strategy
>>>>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
>>>>                 // number of restart attempts
>>>>                 3,
>>>>                 // delay
>>>>                 org.apache.flink.api.common.time.Time.seconds(10)
>>>>         ));
>>>>
>>>>     }
>>>>
>>>> My question is.
>>>> What's happening? Am I doing something wrong? *Why is it taking so
>>>> long to initialize the job and in particular above function?* There is
>>>> hardly any* pre-existing state*, It also takes the same time even if I
>>>> clear the file based snapshot folder. Any direction would be appreciated.
>>>>
>>>> Thank You,
>>>> Santosh
>>>>
>>>

Reply via email to