Hi All,

I have a simple POC project to understand Flink state management and 
integration with Kafka. Thanks in advance for helping me understand and resolve 
the following issues.

I have a FlinkKafkaConsumer that reads payloads from a separate 
FlinkKafkaProducer. 
The final 3 operators in my pipeline are keyed and stateful to save the content 
of passed payload in 3 different stages.
To verify checkpointing and correctness of ValueStates in my code, I kill one 
taskManager manually, and as expected, the job restarts on a second TM. 
But I have noticed the following:

(1) When I print out the valueState.value, it always shows as null even though 
in the checkpointing dir, I see some files and _metaData getting saved. 
I suspected that maybe it’s due to Kryo serialization that gets used due to 
JsonNode.class getting processed as General Type. So I changed my state Object 
to a POJO but still getting null.

(2) when the job restarts, A few of the payloads keep starting from the very 
beginning of the pipeline and keep repeating over and over. I noticed this is 
happening because one of the intermediate operators starts failing (s3 upload) 
after restart on second TM. So this raised the question:
When one operator fails, shouldn’t the failed message only retry the failed 
operator and not start from the beginning of the pipeline? If so, does this 
further prove that the operator checkpoints are not happening properly and thus 
the message needs to start from the very beginning of the pipeline?

Including semi sudo code for reference:

In Main class:

//operators methodA and methodB don't have ValueState
DataStream<JsonNode> payloadStream =
        env.addSource(kafkaConsumer)
        .map((payload) -> methodA(payload))
        .map((payload) -> methodB(payload))
        .keyBy((payload) -> payload.get("x").get("id").asText());

DataStream<Tuple2<JsonNode,JsonNode>> responseStream =
        payloadStream.flatMap(new RichFlatMapA()).name("Post 
Payload1").uid("Post Payload1")
        .keyBy((jsonNodeTuple) -> jsonNodeTuple.f0.get("x").get("id").asText())
        .flatMap(new RichFlatMapB()).name("S3 upload").uid("S3 upload")
        .keyBy((jsonNodeTuple) -> jsonNodeTuple.f0.get("x").get("id").asText())
        .flatMap(new RichFlatMapC()).name("Post Payload2").uid("Post Payload2”)

In one of the operators where I’d like to make payload json stateful: 

public class RichFlatMapA extends RichFlatMapFunction<JsonNode, 
Tuple2<JsonNode,JsonNode>> {

    private ValueState<JsonNode> payload;

    public void open(Configuration config) {
        payload = getRuntimeContext().getState(new 
ValueStateDescriptor<>("saved payload"), JsonNode.class)
    }

    public void flatMap(JsonNode jsonNode, Collector<Tuple2<JsonNode, 
JsonNode>> collector) {
        JsonNode payload = this.payload.value(); //payload value is always null

        if (payload != null) {
            this.payload.clear();
        } else {
            this.payload.update(payload);
        }
        httpPost(jsonNode, collector); 
    }
}  

Thank you, 
Marzi

Reply via email to