Hi All, I have a simple POC project to understand Flink state management and integration with Kafka. Thanks in advance for helping me understand and resolve the following issues.
I have a FlinkKafkaConsumer that reads payloads from a separate FlinkKafkaProducer. The final 3 operators in my pipeline are keyed and stateful to save the content of passed payload in 3 different stages. To verify checkpointing and correctness of ValueStates in my code, I kill one taskManager manually, and as expected, the job restarts on a second TM. But I have noticed the following: (1) When I print out the valueState.value, it always shows as null even though in the checkpointing dir, I see some files and _metaData getting saved. I suspected that maybe it’s due to Kryo serialization that gets used due to JsonNode.class getting processed as General Type. So I changed my state Object to a POJO but still getting null. (2) when the job restarts, A few of the payloads keep starting from the very beginning of the pipeline and keep repeating over and over. I noticed this is happening because one of the intermediate operators starts failing (s3 upload) after restart on second TM. So this raised the question: When one operator fails, shouldn’t the failed message only retry the failed operator and not start from the beginning of the pipeline? If so, does this further prove that the operator checkpoints are not happening properly and thus the message needs to start from the very beginning of the pipeline? Including semi sudo code for reference: In Main class: //operators methodA and methodB don't have ValueState DataStream<JsonNode> payloadStream = env.addSource(kafkaConsumer) .map((payload) -> methodA(payload)) .map((payload) -> methodB(payload)) .keyBy((payload) -> payload.get("x").get("id").asText()); DataStream<Tuple2<JsonNode,JsonNode>> responseStream = payloadStream.flatMap(new RichFlatMapA()).name("Post Payload1").uid("Post Payload1") .keyBy((jsonNodeTuple) -> jsonNodeTuple.f0.get("x").get("id").asText()) .flatMap(new RichFlatMapB()).name("S3 upload").uid("S3 upload") .keyBy((jsonNodeTuple) -> jsonNodeTuple.f0.get("x").get("id").asText()) .flatMap(new RichFlatMapC()).name("Post Payload2").uid("Post Payload2”) In one of the operators where I’d like to make payload json stateful: public class RichFlatMapA extends RichFlatMapFunction<JsonNode, Tuple2<JsonNode,JsonNode>> { private ValueState<JsonNode> payload; public void open(Configuration config) { payload = getRuntimeContext().getState(new ValueStateDescriptor<>("saved payload"), JsonNode.class) } public void flatMap(JsonNode jsonNode, Collector<Tuple2<JsonNode, JsonNode>> collector) { JsonNode payload = this.payload.value(); //payload value is always null if (payload != null) { this.payload.clear(); } else { this.payload.update(payload); } httpPost(jsonNode, collector); } } Thank you, Marzi