Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r196043061
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
    @@ -244,7 +271,14 @@ public ExecutionConfig 
setExecutionConfig(ExecutionConfig config) {
                return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace).thenApply(
                                stateResponse -> {
                                        try {
    -                                           return stateDescriptor.bind(new 
ImmutableStateBinder(stateResponse.getContent()));
    +                                           StateFactory stateFactory = 
STATE_FACTORIES
    +                                                   
.get(stateDescriptor.getClass());
    +                                           if (stateFactory == null) {
    +                                                   String message = 
String.format("State %s is not supported by %s",
    +                                                           
stateDescriptor.getClass(), this.getClass());
    +                                                   throw new 
FlinkRuntimeException(message);
    +                                           }
    +                                           return 
stateFactory.createState(stateDescriptor, stateResponse.getContent());
                                        } catch (Exception e) {
    --- End diff --
    
    Maybe it would make sense to adjust the scope of the `try-catch-block` 
because right now this will catch and wrap our own exception from line 279.


---

Reply via email to