Hi all, I was looking at the Example <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example> section of Querying State <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state>:
QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort); // the state descriptor of the state to be fetched. ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture = client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor); // now handle the returned value resultFuture.thenAccept(response -> { try { Tuple2<Long, Long> res = response.get(); } catch (Exception e) { e.printStackTrace(); } }); The two issues I ran into while trying out this code snippet were: 1. The query request would fail with a NPE, because the client had a null ExecutionConfig. If I added: client.setExecutionConfig(new ExecutionConfig()); Then everything seemed OK, but this doesn’t feel like the right way to solve that problem :) 2. The call to response.get() returns a ValueState<Tuple2<Long, Long>>, not the Tuple2 itself. So it seems like there’s a missing “.value()”. Regards, — Ken -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra