1) The ValueState can only return a non-null value if a prior value with the same key (in your case, "x.id") has been received. Have you double-checked that this is the case?

2) Checkpointing does not alleviate the need to restart all operators, it alleviates having to reprocess all data. It is expected that the entire pipeline is restarted and not just the failed operator. In a nutshell this happens because a) downstream operators would otherwise receive duplicate messages (all message since the last checkpoint until the operator failure)
b) upstream operators need to reproduce the data to process.

On 11/07/2021 00:35, Marzi K wrote:
Hi All,

I have a simple POC project to understand Flink state management and integration with Kafka. Thanks in advance for helping me understand and resolve the following issues.

I have a FlinkKafkaConsumer that reads payloads from a separate FlinkKafkaProducer. The final 3 operators in my pipeline are keyed and stateful to save the content of passed payload in 3 different stages. To verify checkpointing and correctness of ValueStates in my code, I kill one taskManager manually, and as expected, the job restarts on a second TM.
But I have noticed the following:

(1) When I print out the valueState.value, it always shows as null even though in the checkpointing dir, I see some files and _metaData getting saved. I suspected that maybe it’s due to Kryo serialization that gets used due to JsonNode.class getting processed as General Type. So I changed my state Object to a POJO but still getting null.

(2) when the job restarts, A few of the payloads keep starting from the very beginning of the pipeline and keep repeating over and over. I noticed this is happening because one of the intermediate operators starts failing (s3 upload) after restart on second TM. So this raised the question: When one operator fails, shouldn’t the failed message only retry the failed operator and not start from the beginning of the pipeline? If so, does this further prove that the operator checkpoints are not happening properly and thus the message needs to start from the very beginning of the pipeline?

Including semi sudo code for reference:

_In Main class:_

//operators methodA and methodB don't have ValueState
DataStream<JsonNode> payloadStream =
  env.addSource(kafkaConsumer)
  .map((payload) -> methodA(payload))
  .map((payload) -> methodB(payload))
  .keyBy((payload) -> payload.get("x").get("id").asText());

DataStream<Tuple2<JsonNode,JsonNode>> responseStream =
*payloadStream.flatMap(new RichFlatMapA()).name("Post Payload1").uid("Post Payload1")*
  .keyBy((jsonNodeTuple) -> jsonNodeTuple.f0.get("x").get("id").asText())
*.flatMap(new RichFlatMapB()).name("S3 upload").uid("S3 upload")*
  .keyBy((jsonNodeTuple) -> jsonNodeTuple.f0.get("x").get("id").asText())
*  .flatMap(new RichFlatMapC()).name("Post Payload2").uid("Post Payload2”)*
*
*
_In one of the operators where I’d like to make payload json stateful: _

public class RichFlatMapA extends RichFlatMapFunction<JsonNode, Tuple2<JsonNode,JsonNode>> {

private ValueState<JsonNode> payload;

public void open(Configuration config) {
    payload = getRuntimeContext().getState(new ValueStateDescriptor<>("saved payload"), JsonNode.class)
}

public void flatMap(JsonNode jsonNode, Collector<Tuple2<JsonNode, JsonNode>> collector) {     JsonNode payload = this.payload.value(); //payload value is always null

    if (payload != null) {
        this.payload.clear();
    } else {
        this.payload.update(payload);
    }
    httpPost(jsonNode, collector);
}
}

Thank you,
Marzi


Reply via email to