Hello,
Flink Version : 1.20.
I am just beginning to develop a flink job with following topology
Kafka Source 1-> KeyedCoProcessFunction -> Kafka Destination
Kafka Source 2 ^
The KeyedCoProcessFunction listens to two kafka topics, It also maintains
State with following variables. (psudo) code.
class MyClass extends KeyedCoProcessFunction {
private final MapState<Long, Pojo1> Pojo1Map;
private final MapState<String, Pojo2> Pojo2Map;
private final MapState<Long, Map<String, String>> 3rdStatemap;
private final MapState<String, Long> 4thMap;
private final MapState<String, Pojo3> pojo3Map;
public void open(Configuration parameters) throws Exception {
//flink specific code to initialize above mapstates
}
}
This is my just beginning of development and there could be at most 5-10
kafka messages.
On my development box, I noticed that for almost 12-15 minutes the *flink
dashboard shows KeyedCoProcessFunction task in initializing status.* Until
then no kafka message is processed, After 12 minutes, it processes a new
kafka message and accumulated messages correctly.
I have checkpointing enabled with the following settings.
private void enableCheckpointing(StreamExecutionEnvironment env,
ParameterTool parameters) {
// Checkpointing every 10 seconds
env.enableCheckpointing(10000);
//Advanced options
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// Checkpoint storage location
if (parameters.has(CHECKPOINT_PATH)) {
String checkpointPath = parameters.get(CHECKPOINT_PATH);
logger.info("Setting checkpoint path to: {}", checkpointPath);
env.getCheckpointConfig().setCheckpointStorage(new
FileSystemCheckpointStorage(checkpointPath));
} else {
logger.warn("No checkpoint path provided. Using default
checkpointing.");
}
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
checkpointConfig.setCheckpointTimeout(120000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.enableUnalignedCheckpoints();
//fixed delay strategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
// number of restart attempts
3,
// delay
org.apache.flink.api.common.time.Time.seconds(10)
));
}
My question is.
What's happening? Am I doing something wrong? *Why is it taking so long to
initialize the job and in particular above function?* There is hardly any*
pre-existing state*, It also takes the same time even if I clear the file
based snapshot folder. Any direction would be appreciated.
Thank You,
Santosh