Are you on local FS from checkpoint perspective? G
On Mon, Apr 7, 2025 at 7:05 PM santosh techie <santosh.tec...@gmail.com> wrote: > Hi, > > The checkpoint is hardly anything, a few 10s KBs max. Disk is also Ok, On > further investigation, I suspect is the way exactly once is determined with > kafka, I see that kafka broker CPU % is very high during flink start up > with numerous following logs in kafka broker, > > 2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO [TransactionCoordinator > id=1] Initialized transactionalId <my-topic1>-2-88 with producerId 16835 > and producer epoch 162 on partition __transaction_state-0 > (kafka.coordinator.transaction.TransactionCoordinator) > 2025-04-07 22:26:42 [2025-04-07 16:56:42,743] INFO [TransactionCoordinator > id=1] Initialized transactionalId <my-topic2>-1335-21 with producerId > 1287059 and producer epoch 0 on partition __transaction_state-3 > (kafka.coordinator.transaction.TransactionCoordinator) > 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO [TransactionCoordinator > id=1] Initialized transactionalId <my-topic1>-0-1246 with producerId 74060 > and producer epoch 129 on partition __transaction_state-47 > (kafka.coordinator.transaction.TransactionCoordinator) > 2025-04-07 22:26:42 [2025-04-07 16:56:42,744] INFO [TransactionCoordinator > id=1] Initialized transactionalId <my-topic2>-1-26 with producerId 8575 and > producer epoch 186 on partition __ > > Only When the above logs stop appearing in kafka broker logs, that I see > that the function is able to accept kafka messages for processing. > > My flink kafka consumer and sink are as follows. > > public static <T> KafkaSource<T> createKafkaSource(KafkaPropertiesReader > reader, DeserializationSchema<T> deserializationSchema) { > Properties consumerProperties = new Properties(); > > > return KafkaSource.<T>builder() > .setBootstrapServers(reader.getBootStrapServers()) > .setTopics(reader.getTopic()) > .setGroupId(reader.getGroupId()) > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) > // Start from committed or use checkpoints > //.setStartingOffsets(OffsetsInitializer.latest()) > .setProperties(consumerProperties) > > .setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), > "true") > .setValueOnlyDeserializer(deserializationSchema) > > .build(); > } > > public static <T> KafkaSink<T> createKafkaSink(KafkaOutputReader > reader, KafkaRecordSerializationSchema<T> serializationSchema) { > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", > reader.getBootStrapServers()); > properties.setProperty("transaction.timeout.ms", > reader.getTransactionTimeoutMs()); > properties.setProperty("retries", reader.getRetries()); > properties.setProperty("linger.ms", reader.getLingerMs()); > properties.setProperty("batch.size", reader.getBatchSize()); > properties.setProperty("enable.auto.commit", "false"); > properties.setProperty("isolation.level", "read_committed"); // > For transactional reads > > > return KafkaSink.<T>builder() > .setBootstrapServers(reader.getBootStrapServers()) > .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) > > .setTransactionalIdPrefix(reader.getTransactionalIdPrefix()) > .setKafkaProducerConfig(properties) > .setRecordSerializer(serializationSchema) > .build(); > } > > > Am I missing anything while initializing kafka? > > > On Mon, Apr 7, 2025 at 9:59 AM Zakelly Lan <zakelly....@gmail.com> wrote: > >> Hi Santosh, >> >> May I ask which state backend you are using? And how huge is the >> checkpoint? You may check the disk usage of the configured checkpoint path. >> >> >> Best, >> Zakelly >> >> On Sat, Apr 5, 2025 at 11:58 PM santosh techie <santosh.tec...@gmail.com> >> wrote: >> >>> Hello, >>> >>> Flink Version : 1.20. >>> >>> I am just beginning to develop a flink job with following topology >>> >>> Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination >>> Kafka Source 2 ^ >>> >>> The KeyedCoProcessFunction listens to two kafka topics, It also >>> maintains State with following variables. (psudo) code. >>> >>> class MyClass extends KeyedCoProcessFunction { >>> >>> private final MapState<Long, Pojo1> Pojo1Map; >>> private final MapState<String, Pojo2> Pojo2Map; >>> private final MapState<Long, Map<String, String>> 3rdStatemap; >>> private final MapState<String, Long> 4thMap; >>> private final MapState<String, Pojo3> pojo3Map; >>> >>> public void open(Configuration parameters) throws Exception { >>> //flink specific code to initialize above mapstates >>> } >>> } >>> >>> >>> This is my just beginning of development and there could be at most 5-10 >>> kafka messages. >>> >>> On my development box, I noticed that for almost 12-15 minutes the *flink >>> dashboard shows KeyedCoProcessFunction task in initializing status.* >>> Until then no kafka message is processed, After 12 minutes, it processes a >>> new kafka message and accumulated messages correctly. >>> >>> >>> I have checkpointing enabled with the following settings. >>> private void enableCheckpointing(StreamExecutionEnvironment env, >>> ParameterTool parameters) { >>> // Checkpointing every 10 seconds >>> env.enableCheckpointing(10000); >>> >>> //Advanced options >>> CheckpointConfig checkpointConfig = env.getCheckpointConfig(); >>> // Checkpoint storage location >>> if (parameters.has(CHECKPOINT_PATH)) { >>> String checkpointPath = parameters.get(CHECKPOINT_PATH); >>> logger.info("Setting checkpoint path to: {}", >>> checkpointPath); >>> env.getCheckpointConfig().setCheckpointStorage(new >>> FileSystemCheckpointStorage(checkpointPath)); >>> } else { >>> logger.warn("No checkpoint path provided. Using default >>> checkpointing."); >>> } >>> >>> >>> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>> checkpointConfig.setMinPauseBetweenCheckpoints(1000); >>> checkpointConfig.setCheckpointTimeout(120000); >>> checkpointConfig.setMaxConcurrentCheckpoints(1); >>> >>> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >>> checkpointConfig.enableUnalignedCheckpoints(); >>> >>> //fixed delay strategy >>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart( >>> // number of restart attempts >>> 3, >>> // delay >>> org.apache.flink.api.common.time.Time.seconds(10) >>> )); >>> >>> } >>> >>> My question is. >>> What's happening? Am I doing something wrong? *Why is it taking so long >>> to initialize the job and in particular above function?* There is >>> hardly any* pre-existing state*, It also takes the same time even if I >>> clear the file based snapshot folder. Any direction would be appreciated. >>> >>> Thank You, >>> Santosh >>> >>