Hi all,

I was looking at the Example 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example>
 section of Querying State 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state>:

QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);

// the state descriptor of the state to be fetched.
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
          "average",
          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
        client.getKvState(jobId, "query-name", key, 
BasicTypeInfo.LONG_TYPE_INFO, descriptor);

// now handle the returned value
resultFuture.thenAccept(response -> {
        try {
            Tuple2<Long, Long> res = response.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
});
The two issues I ran into while trying out this code snippet were:

1. The query request would fail with a NPE, because the client had a null 
ExecutionConfig.

If I added:

            client.setExecutionConfig(new ExecutionConfig());

Then everything seemed OK, but this doesn’t feel like the right way to solve 
that problem :)

2. The call to response.get() returns a ValueState<Tuple2<Long, Long>>, not the 
Tuple2 itself.

So it seems like there’s a missing “.value()”.

Regards,

— Ken

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to