Yes, It uses local FS. But as I said in my follow up email. I suspect it to be my kafka source / sink settings. Can anyone tell, for exactly once processing what should be ideal settings for kafka source and sink?
Thank you On Fri, Apr 11, 2025 at 5:22 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Are you on local FS from checkpoint perspective? > > G > > > On Mon, Apr 7, 2025 at 7:05 PM santosh techie <santosh.tec...@gmail.com> > wrote: > >> Hi, >> >> The checkpoint is hardly anything, a few 10s KBs max. Disk is also Ok, On >> further investigation, I suspect is the way exactly once is determined with >> kafka, I see that kafka broker CPU % is very high during flink start up >> with numerous following logs in kafka broker, >> >> 2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO >> [TransactionCoordinator id=1] Initialized transactionalId <my-topic1>-2-88 >> with producerId 16835 and producer epoch 162 on partition >> __transaction_state-0 (kafka.coordinator.transaction.TransactionCoordinator) >> 2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO >> [TransactionCoordinator id=1] Initialized transactionalId >> <my-topic2>-1335-21 with producerId 1287059 and producer epoch 0 on >> partition __transaction_state-3 >> (kafka.coordinator.transaction.TransactionCoordinator) >> 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO >> [TransactionCoordinator id=1] Initialized transactionalId >> <my-topic1>-0-1246 with producerId 74060 and producer epoch 129 on >> partition __transaction_state-47 >> (kafka.coordinator.transaction.TransactionCoordinator) >> 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO >> [TransactionCoordinator id=1] Initialized transactionalId <my-topic2>-1-26 >> with producerId 8575 and producer epoch 186 on partition __ >> >> Only When the above logs stop appearing in kafka broker logs, that I see >> that the function is able to accept kafka messages for processing. >> >> My flink kafka consumer and sink are as follows. >> >> public static <T> KafkaSource<T> createKafkaSource(KafkaPropertiesReader >> reader, DeserializationSchema<T> deserializationSchema) { >> Properties consumerProperties = new Properties(); >> >> >> return KafkaSource.<T>builder() >> .setBootstrapServers(reader.getBootStrapServers()) >> .setTopics(reader.getTopic()) >> .setGroupId(reader.getGroupId()) >> >> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) >> // Start from committed or use checkpoints >> //.setStartingOffsets(OffsetsInitializer.latest()) >> .setProperties(consumerProperties) >> >> .setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), >> "true") >> .setValueOnlyDeserializer(deserializationSchema) >> >> .build(); >> } >> >> public static <T> KafkaSink<T> createKafkaSink(KafkaOutputReader >> reader, KafkaRecordSerializationSchema<T> serializationSchema) { >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", >> reader.getBootStrapServers()); >> properties.setProperty("transaction.timeout.ms", >> reader.getTransactionTimeoutMs()); >> properties.setProperty("retries", reader.getRetries()); >> properties.setProperty("linger.ms", reader.getLingerMs()); >> properties.setProperty("batch.size", reader.getBatchSize()); >> properties.setProperty("enable.auto.commit", "false"); >> properties.setProperty("isolation.level", "read_committed"); // >> For transactional reads >> >> >> return KafkaSink.<T>builder() >> .setBootstrapServers(reader.getBootStrapServers()) >> .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) >> >> .setTransactionalIdPrefix(reader.getTransactionalIdPrefix()) >> .setKafkaProducerConfig(properties) >> .setRecordSerializer(serializationSchema) >> .build(); >> } >> >> >> Am I missing anything while initializing kafka? >> >> >> On Mon, Apr 7, 2025 at 9:59 AM Zakelly Lan <zakelly....@gmail.com> wrote: >> >>> Hi Santosh, >>> >>> May I ask which state backend you are using? And how huge is the >>> checkpoint? You may check the disk usage of the configured checkpoint path. >>> >>> >>> Best, >>> Zakelly >>> >>> On Sat, Apr 5, 2025 at 11:58 PM santosh techie <santosh.tec...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> Flink Version : 1.20. >>>> >>>> I am just beginning to develop a flink job with following topology >>>> >>>> Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination >>>> Kafka Source 2 ^ >>>> >>>> The KeyedCoProcessFunction listens to two kafka topics, It also >>>> maintains State with following variables. (psudo) code. >>>> >>>> class MyClass extends KeyedCoProcessFunction { >>>> >>>> private final MapState<Long, Pojo1> Pojo1Map; >>>> private final MapState<String, Pojo2> Pojo2Map; >>>> private final MapState<Long, Map<String, String>> 3rdStatemap; >>>> private final MapState<String, Long> 4thMap; >>>> private final MapState<String, Pojo3> pojo3Map; >>>> >>>> public void open(Configuration parameters) throws Exception { >>>> //flink specific code to initialize above mapstates >>>> } >>>> } >>>> >>>> >>>> This is my just beginning of development and there could be at most >>>> 5-10 kafka messages. >>>> >>>> On my development box, I noticed that for almost 12-15 minutes the *flink >>>> dashboard shows KeyedCoProcessFunction task in initializing status.* >>>> Until then no kafka message is processed, After 12 minutes, it processes a >>>> new kafka message and accumulated messages correctly. >>>> >>>> >>>> I have checkpointing enabled with the following settings. >>>> private void enableCheckpointing(StreamExecutionEnvironment env, >>>> ParameterTool parameters) { >>>> // Checkpointing every 10 seconds >>>> env.enableCheckpointing(10000); >>>> >>>> //Advanced options >>>> CheckpointConfig checkpointConfig = env.getCheckpointConfig(); >>>> // Checkpoint storage location >>>> if (parameters.has(CHECKPOINT_PATH)) { >>>> String checkpointPath = parameters.get(CHECKPOINT_PATH); >>>> logger.info("Setting checkpoint path to: {}", >>>> checkpointPath); >>>> env.getCheckpointConfig().setCheckpointStorage(new >>>> FileSystemCheckpointStorage(checkpointPath)); >>>> } else { >>>> logger.warn("No checkpoint path provided. Using default >>>> checkpointing."); >>>> } >>>> >>>> >>>> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>>> checkpointConfig.setMinPauseBetweenCheckpoints(1000); >>>> checkpointConfig.setCheckpointTimeout(120000); >>>> checkpointConfig.setMaxConcurrentCheckpoints(1); >>>> >>>> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >>>> checkpointConfig.enableUnalignedCheckpoints(); >>>> >>>> //fixed delay strategy >>>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart( >>>> // number of restart attempts >>>> 3, >>>> // delay >>>> org.apache.flink.api.common.time.Time.seconds(10) >>>> )); >>>> >>>> } >>>> >>>> My question is. >>>> What's happening? Am I doing something wrong? *Why is it taking so >>>> long to initialize the job and in particular above function?* There is >>>> hardly any* pre-existing state*, It also takes the same time even if I >>>> clear the file based snapshot folder. Any direction would be appreciated. >>>> >>>> Thank You, >>>> Santosh >>>> >>>