Hi,

> 1) The ValueState can only return a non-null value if a prior value with the 
> same key (in your case, "x.id") has been received. Have you double-checked 
> that this is the case?

I use the payloads in each operator to make post requests. So the payloads are 
not null and I keyBy using the id field in each payload. 
Not sure what else could be causing the null values. 

Also in the checkpointing tab on Flink Dashboard, I see MB sizes for data 
persisted. But when I print out the the valueState in the code, it shows as 
null. 
So wondering if it’s actually saving data or it’s null.

Thanks again, 
Marzi


> On Jul 12, 2021, at 3:21 AM, Chesnay Schepler <ches...@apache.org> wrote:
> 
> 1) The ValueState can only return a non-null value if a prior value with the 
> same key (in your case, "x.id") has been received. Have you double-checked 
> that this is the case?
> 
> 2) Checkpointing does not alleviate the need to restart all operators, it 
> alleviates having to reprocess all data.
> It is expected that the entire pipeline is restarted and not just the failed 
> operator. In a nutshell this happens because
> a) downstream operators would otherwise receive duplicate messages (all 
> message since the last checkpoint until the operator failure)
> b) upstream operators need to reproduce the data to process.
> 
> On 11/07/2021 00:35, Marzi K wrote:
>> Hi All,
>> 
>> I have a simple POC project to understand Flink state management and 
>> integration with Kafka. Thanks in advance for helping me understand and 
>> resolve the following issues.
>> 
>> I have a FlinkKafkaConsumer that reads payloads from a separate 
>> FlinkKafkaProducer. 
>> The final 3 operators in my pipeline are keyed and stateful to save the 
>> content of passed payload in 3 different stages.
>> To verify checkpointing and correctness of ValueStates in my code, I kill 
>> one taskManager manually, and as expected, the job restarts on a second TM. 
>> But I have noticed the following:
>> 
>> (1) When I print out the valueState.value, it always shows as null even 
>> though in the checkpointing dir, I see some files and _metaData getting 
>> saved. 
>> I suspected that maybe it’s due to Kryo serialization that gets used due to 
>> JsonNode.class getting processed as General Type. So I changed my state 
>> Object to a POJO but still getting null.
>> 
>> (2) when the job restarts, A few of the payloads keep starting from the very 
>> beginning of the pipeline and keep repeating over and over. I noticed this 
>> is happening because one of the intermediate operators starts failing (s3 
>> upload) after restart on second TM. So this raised the question:
>> When one operator fails, shouldn’t the failed message only retry the failed 
>> operator and not start from the beginning of the pipeline? If so, does this 
>> further prove that the operator checkpoints are not happening properly and 
>> thus the message needs to start from the very beginning of the pipeline?
>> 
>> Including semi sudo code for reference:
>> 
>> In Main class:
>> 
>> //operators methodA and methodB don't have ValueState
>> DataStream<JsonNode> payloadStream =
>>         env.addSource(kafkaConsumer)
>>         .map((payload) -> methodA(payload))
>>         .map((payload) -> methodB(payload))
>>         .keyBy((payload) -> payload.get("x").get("id").asText());
>> 
>> DataStream<Tuple2<JsonNode,JsonNode>> responseStream =
>>         payloadStream.flatMap(new RichFlatMapA()).name("Post 
>> Payload1").uid("Post Payload1")
>>         .keyBy((jsonNodeTuple) -> 
>> jsonNodeTuple.f0.get("x").get("id").asText())
>>         .flatMap(new RichFlatMapB()).name("S3 upload").uid("S3 upload")
>>         .keyBy((jsonNodeTuple) -> 
>> jsonNodeTuple.f0.get("x").get("id").asText())
>>         .flatMap(new RichFlatMapC()).name("Post Payload2").uid("Post 
>> Payload2”)
>> 
>> In one of the operators where I’d like to make payload json stateful: 
>> 
>> public class RichFlatMapA extends RichFlatMapFunction<JsonNode, 
>> Tuple2<JsonNode,JsonNode>> {
>> 
>>     private ValueState<JsonNode> payload;
>> 
>>     public void open(Configuration config) {
>>         payload = getRuntimeContext().getState(new 
>> ValueStateDescriptor<>("saved payload"), JsonNode.class)
>>     }
>> 
>>     public void flatMap(JsonNode jsonNode, Collector<Tuple2<JsonNode, 
>> JsonNode>> collector) {
>>         JsonNode payload = this.payload.value(); //payload value is always 
>> null
>> 
>>         if (payload != null) {
>>             this.payload.clear();
>>         } else {
>>             this.payload.update(payload);
>>         }
>>         httpPost(jsonNode, collector); 
>>     }
>> }  
>> 
>> Thank you, 
>> Marzi
> 

Reply via email to