Hi Yuta,
      Have you set a default value for the state ? If the state did not have a 
default value and the records from stream2 comes first for a specific key, then 
the state would never be set with a value, thus the return value will be null.

Best,
Yun



------------------------------------------------------------------
From:Yuta Morisawa <yu-moris...@kddi-research.jp>
Send Time:2019 Aug. 7 (Wed.) 08:56
To:user <user@flink.apache.org>
Subject:How to use Lo-level Joins API

Hi

I am trying to use low-level joins.
According to the doc, the way is creating a state and access it from 
both streams, but I can't.

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html

This is a snippet of my code.
It seems that the processElement1,2 have different ValueStates so that 
v1 in processElement2 is always null.

---
stream1.connect(stream2).keyBy(0,0).process(new MyCPF());

public class MyCPF extends CoProcessFunction{
  ValueState data;

  processElement1(v1){
    data.update(v1);
  }

  processElement2(v2){
    v1 = data.value() // v1 is always null
    out.collect(v1 + v2)
  }

  open(){
    data = getRuntimeContext().getState(descriptor);
  }

}
---

Can you tell me the collect way of the low-level joins and send me a 
sample code if you have?

--
Thank you
Yuta

Reply via email to