Hi Ken, About the first question, the way of fixing is right. About the second question, you are right, the "response.get()" missed 'ValueState<>'. Can you create a JIRA issue or fix it if you want.
Thanks, vino. Ken Krugler <kkrugler_li...@transpac.com> 于2018年9月20日周四 上午6:27写道: > Hi all, > > I was looking at the Example > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example> > section > of Querying State > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state> > : > > QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort); > // the state descriptor of the state to be > fetched.ValueStateDescriptor<Tuple2<Long, Long>> descriptor = > new ValueStateDescriptor<>( > "average", > TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); > CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture = > client.getKvState(jobId, "query-name", key, > BasicTypeInfo.LONG_TYPE_INFO, descriptor); > // now handle the returned valueresultFuture.thenAccept(response -> { > try { > Tuple2<Long, Long> res = response.get(); > } catch (Exception e) { > e.printStackTrace(); > }}); > > The two issues I ran into while trying out this code snippet were: > > 1. The query request would fail with a NPE, because the client had a null > ExecutionConfig. > > If I added: > > client.setExecutionConfig(new ExecutionConfig()); > > Then everything seemed OK, but this doesn’t feel like the right way to > solve that problem :) > > 2. The call to response.get() returns a ValueState<Tuple2<Long, Long>>, > not the Tuple2 itself. > > So it seems like there’s a missing “.value()”. > > Regards, > > — Ken > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > >