Hi, > 1) The ValueState can only return a non-null value if a prior value with the > same key (in your case, "x.id") has been received. Have you double-checked > that this is the case?
I use the payloads in each operator to make post requests. So the payloads are not null and I keyBy using the id field in each payload. Not sure what else could be causing the null values. Also in the checkpointing tab on Flink Dashboard, I see MB sizes for data persisted. But when I print out the the valueState in the code, it shows as null. So wondering if it’s actually saving data or it’s null. Thanks again, Marzi > On Jul 12, 2021, at 3:21 AM, Chesnay Schepler <ches...@apache.org> wrote: > > 1) The ValueState can only return a non-null value if a prior value with the > same key (in your case, "x.id") has been received. Have you double-checked > that this is the case? > > 2) Checkpointing does not alleviate the need to restart all operators, it > alleviates having to reprocess all data. > It is expected that the entire pipeline is restarted and not just the failed > operator. In a nutshell this happens because > a) downstream operators would otherwise receive duplicate messages (all > message since the last checkpoint until the operator failure) > b) upstream operators need to reproduce the data to process. > > On 11/07/2021 00:35, Marzi K wrote: >> Hi All, >> >> I have a simple POC project to understand Flink state management and >> integration with Kafka. Thanks in advance for helping me understand and >> resolve the following issues. >> >> I have a FlinkKafkaConsumer that reads payloads from a separate >> FlinkKafkaProducer. >> The final 3 operators in my pipeline are keyed and stateful to save the >> content of passed payload in 3 different stages. >> To verify checkpointing and correctness of ValueStates in my code, I kill >> one taskManager manually, and as expected, the job restarts on a second TM. >> But I have noticed the following: >> >> (1) When I print out the valueState.value, it always shows as null even >> though in the checkpointing dir, I see some files and _metaData getting >> saved. >> I suspected that maybe it’s due to Kryo serialization that gets used due to >> JsonNode.class getting processed as General Type. So I changed my state >> Object to a POJO but still getting null. >> >> (2) when the job restarts, A few of the payloads keep starting from the very >> beginning of the pipeline and keep repeating over and over. I noticed this >> is happening because one of the intermediate operators starts failing (s3 >> upload) after restart on second TM. So this raised the question: >> When one operator fails, shouldn’t the failed message only retry the failed >> operator and not start from the beginning of the pipeline? If so, does this >> further prove that the operator checkpoints are not happening properly and >> thus the message needs to start from the very beginning of the pipeline? >> >> Including semi sudo code for reference: >> >> In Main class: >> >> //operators methodA and methodB don't have ValueState >> DataStream<JsonNode> payloadStream = >> env.addSource(kafkaConsumer) >> .map((payload) -> methodA(payload)) >> .map((payload) -> methodB(payload)) >> .keyBy((payload) -> payload.get("x").get("id").asText()); >> >> DataStream<Tuple2<JsonNode,JsonNode>> responseStream = >> payloadStream.flatMap(new RichFlatMapA()).name("Post >> Payload1").uid("Post Payload1") >> .keyBy((jsonNodeTuple) -> >> jsonNodeTuple.f0.get("x").get("id").asText()) >> .flatMap(new RichFlatMapB()).name("S3 upload").uid("S3 upload") >> .keyBy((jsonNodeTuple) -> >> jsonNodeTuple.f0.get("x").get("id").asText()) >> .flatMap(new RichFlatMapC()).name("Post Payload2").uid("Post >> Payload2”) >> >> In one of the operators where I’d like to make payload json stateful: >> >> public class RichFlatMapA extends RichFlatMapFunction<JsonNode, >> Tuple2<JsonNode,JsonNode>> { >> >> private ValueState<JsonNode> payload; >> >> public void open(Configuration config) { >> payload = getRuntimeContext().getState(new >> ValueStateDescriptor<>("saved payload"), JsonNode.class) >> } >> >> public void flatMap(JsonNode jsonNode, Collector<Tuple2<JsonNode, >> JsonNode>> collector) { >> JsonNode payload = this.payload.value(); //payload value is always >> null >> >> if (payload != null) { >> this.payload.clear(); >> } else { >> this.payload.update(payload); >> } >> httpPost(jsonNode, collector); >> } >> } >> >> Thank you, >> Marzi >