Hi Yuta, Have you set a default value for the state ? If the state did not have a default value and the records from stream2 comes first for a specific key, then the state would never be set with a value, thus the return value will be null.
Best, Yun ------------------------------------------------------------------ From:Yuta Morisawa <yu-moris...@kddi-research.jp> Send Time:2019 Aug. 7 (Wed.) 08:56 To:user <user@flink.apache.org> Subject:How to use Lo-level Joins API Hi I am trying to use low-level joins. According to the doc, the way is creating a state and access it from both streams, but I can't. https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html This is a snippet of my code. It seems that the processElement1,2 have different ValueStates so that v1 in processElement2 is always null. --- stream1.connect(stream2).keyBy(0,0).process(new MyCPF()); public class MyCPF extends CoProcessFunction{ ValueState data; processElement1(v1){ data.update(v1); } processElement2(v2){ v1 = data.value() // v1 is always null out.collect(v1 + v2) } open(){ data = getRuntimeContext().getState(descriptor); } } --- Can you tell me the collect way of the low-level joins and send me a sample code if you have? -- Thank you Yuta