Are you on local FS from checkpoint perspective?

G


On Mon, Apr 7, 2025 at 7:05 PM santosh techie <santosh.tec...@gmail.com>
wrote:

> Hi,
>
> The checkpoint is hardly anything, a few 10s KBs max. Disk is also Ok, On
> further investigation, I suspect is the way exactly once is determined with
> kafka, I see that kafka broker CPU % is very high during flink start up
> with numerous following logs in kafka broker,
>
> 2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO [TransactionCoordinator
> id=1] Initialized transactionalId <my-topic1>-2-88 with producerId 16835
> and producer epoch 162 on partition __transaction_state-0
> (kafka.coordinator.transaction.TransactionCoordinator)
> 2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO [TransactionCoordinator
> id=1] Initialized transactionalId <my-topic2>-1335-21 with producerId
> 1287059 and producer epoch 0 on partition __transaction_state-3
> (kafka.coordinator.transaction.TransactionCoordinator)
> 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO [TransactionCoordinator
> id=1] Initialized transactionalId <my-topic1>-0-1246 with producerId 74060
> and producer epoch 129 on partition __transaction_state-47
> (kafka.coordinator.transaction.TransactionCoordinator)
> 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO [TransactionCoordinator
> id=1] Initialized transactionalId <my-topic2>-1-26 with producerId 8575 and
> producer epoch 186 on partition __
>
> Only When the above logs stop appearing in kafka broker logs, that I see
> that the function is able to accept kafka messages for processing.
>
> My flink kafka consumer and sink are as follows.
>
> public static <T> KafkaSource<T> createKafkaSource(KafkaPropertiesReader
> reader, DeserializationSchema<T> deserializationSchema) {
>         Properties consumerProperties = new Properties();
>
>
>         return KafkaSource.<T>builder()
>                 .setBootstrapServers(reader.getBootStrapServers())
>                 .setTopics(reader.getTopic())
>                 .setGroupId(reader.getGroupId())
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
> // Start from committed or use checkpoints
>                 //.setStartingOffsets(OffsetsInitializer.latest())
>                 .setProperties(consumerProperties)
>
> .setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>                         "true")
>                 .setValueOnlyDeserializer(deserializationSchema)
>
>                 .build();
>     }
>
>     public static <T> KafkaSink<T> createKafkaSink(KafkaOutputReader
> reader, KafkaRecordSerializationSchema<T> serializationSchema) {
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> reader.getBootStrapServers());
>         properties.setProperty("transaction.timeout.ms",
> reader.getTransactionTimeoutMs());
>         properties.setProperty("retries", reader.getRetries());
>         properties.setProperty("linger.ms", reader.getLingerMs());
>         properties.setProperty("batch.size", reader.getBatchSize());
>         properties.setProperty("enable.auto.commit", "false");
>         properties.setProperty("isolation.level", "read_committed"); //
> For transactional reads
>
>
>         return KafkaSink.<T>builder()
>                 .setBootstrapServers(reader.getBootStrapServers())
>                 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>
> .setTransactionalIdPrefix(reader.getTransactionalIdPrefix())
>                 .setKafkaProducerConfig(properties)
>                 .setRecordSerializer(serializationSchema)
>                 .build();
>     }
>
>
> Am I missing anything while initializing kafka?
>
>
> On Mon, Apr 7, 2025 at 9:59 AM Zakelly Lan <zakelly....@gmail.com> wrote:
>
>> Hi Santosh,
>>
>> May I ask which state backend you are using? And how huge is the
>> checkpoint? You may check the disk usage of the configured checkpoint path.
>>
>>
>> Best,
>> Zakelly
>>
>> On Sat, Apr 5, 2025 at 11:58 PM santosh techie <santosh.tec...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Flink Version : 1.20.
>>>
>>> I am just beginning to develop a flink job with following topology
>>>
>>> Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination
>>> Kafka Source 2 ^
>>>
>>> The KeyedCoProcessFunction listens to two kafka topics, It also
>>> maintains State with following variables. (psudo) code.
>>>
>>> class MyClass extends KeyedCoProcessFunction {
>>>
>>> private final MapState<Long, Pojo1> Pojo1Map;
>>>     private final MapState<String, Pojo2> Pojo2Map;
>>>     private final MapState<Long, Map<String, String>> 3rdStatemap;
>>>     private final MapState<String, Long> 4thMap;
>>>     private final MapState<String, Pojo3> pojo3Map;
>>>
>>> public void open(Configuration parameters) throws Exception {
>>>   //flink specific code to initialize above mapstates
>>> }
>>> }
>>>
>>>
>>> This is my just beginning of development and there could be at most 5-10
>>> kafka messages.
>>>
>>> On my development box, I noticed that for almost 12-15 minutes the *flink
>>> dashboard shows KeyedCoProcessFunction task in initializing status.*
>>> Until then no kafka message is processed, After 12 minutes, it processes a
>>> new kafka message and accumulated messages correctly.
>>>
>>>
>>> I have checkpointing enabled with the following settings.
>>> private void enableCheckpointing(StreamExecutionEnvironment env,
>>> ParameterTool parameters) {
>>>         // Checkpointing every 10 seconds
>>>         env.enableCheckpointing(10000);
>>>
>>>         //Advanced options
>>>         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>>>         // Checkpoint storage location
>>>         if (parameters.has(CHECKPOINT_PATH)) {
>>>             String checkpointPath = parameters.get(CHECKPOINT_PATH);
>>>             logger.info("Setting checkpoint path to: {}",
>>> checkpointPath);
>>>             env.getCheckpointConfig().setCheckpointStorage(new
>>> FileSystemCheckpointStorage(checkpointPath));
>>>         } else {
>>>             logger.warn("No checkpoint path provided. Using default
>>> checkpointing.");
>>>         }
>>>
>>>
>>> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>         checkpointConfig.setMinPauseBetweenCheckpoints(1000);
>>>         checkpointConfig.setCheckpointTimeout(120000);
>>>         checkpointConfig.setMaxConcurrentCheckpoints(1);
>>>
>>> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>         checkpointConfig.enableUnalignedCheckpoints();
>>>
>>>         //fixed delay strategy
>>>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
>>>                 // number of restart attempts
>>>                 3,
>>>                 // delay
>>>                 org.apache.flink.api.common.time.Time.seconds(10)
>>>         ));
>>>
>>>     }
>>>
>>> My question is.
>>> What's happening? Am I doing something wrong? *Why is it taking so long
>>> to initialize the job and in particular above function?* There is
>>> hardly any* pre-existing state*, It also takes the same time even if I
>>> clear the file based snapshot folder. Any direction would be appreciated.
>>>
>>> Thank You,
>>> Santosh
>>>
>>

Reply via email to