Hi 唐云
以下是state定义以及初始化的code
public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String,
Tuple2<String, ObjectNode>, Tuple2<String, JsonNode>> {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);
...
private final ParameterTool params;
private transient ListState<ObjectNode> unmatchedProbesState;
...
FlinkKeyedProcessFunction(ParameterTool params) {
this.params = params;
}
@Override
public void open(Configuration parameters) {
ListStateDescriptor<ObjectNode> descriptor = new
ListStateDescriptor<>(
"unmatchedProbes", TypeInformation.of(ObjectNode.class)
);
unmatchedProbesState = getRuntimeContext().getListState(descriptor);
以下是往state里add内容的部分
...
List<ObjectNode> unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
unmatchedProbesState.clear();
if (unmatchedProbes.size() > 0) {
try {
unmatchedProbesState.addAll(unmatchedProbes);
} catch (Exception e) {
LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
}
}
...
以下是从state读取的code
for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
LOG.info("Processing unmatched probe: " +
unmatchedProbe);
matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
}
之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。
谢谢。
Rising
--
Sent from: http://apache-flink.147419.n8.nabble.com/