Hi Ken,

About the first question, the way of fixing is right.
About the second question, you are right, the "response.get()" missed
'ValueState<>'.
Can you create a JIRA issue or fix it if you want.

Thanks, vino.



Ken Krugler <kkrugler_li...@transpac.com> 于2018年9月20日周四 上午6:27写道:

> Hi all,
>
> I was looking at the Example
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example>
>  section
> of Querying State
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state>
> :
>
> QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
> // the state descriptor of the state to be 
> fetched.ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>         new ValueStateDescriptor<>(
>           "average",
>           TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
> CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
>         client.getKvState(jobId, "query-name", key, 
> BasicTypeInfo.LONG_TYPE_INFO, descriptor);
> // now handle the returned valueresultFuture.thenAccept(response -> {
>         try {
>             Tuple2<Long, Long> res = response.get();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }});
>
> The two issues I ran into while trying out this code snippet were:
>
> 1. The query request would fail with a NPE, because the client had a null
> ExecutionConfig.
>
> If I added:
>
>             client.setExecutionConfig(new ExecutionConfig());
>
> Then everything seemed OK, but this doesn’t feel like the right way to
> solve that problem :)
>
> 2. The call to response.get() returns a ValueState<Tuple2<Long, Long>>,
> not the Tuple2 itself.
>
> So it seems like there’s a missing “.value()”.
>
> Regards,
>
> — Ken
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to