Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6173#discussion_r195903191 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -244,7 +271,14 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) { return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply( stateResponse -> { try { - return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent())); + if (!STATE_FACTORIES.containsKey(stateDescriptor.getClass())) { + String message = String.format("State %s is not supported by %s", + stateDescriptor.getClass(), this.getClass()); + throw new FlinkRuntimeException(message); + } + return STATE_FACTORIES + .get(stateDescriptor.getClass()) --- End diff -- Maybe we can merge the `containsKey()` and the `get()` into a single `get()`, this way we don't need to query the `STATE_FACTORIES` twice.
---