Hi,
I am trying to query Flink's MapState from Flink client (1.3.2). I was able
to query ValueState but when I tried to query MapState I am getting an
exception.

java.io.IOException: Unconsumed bytes in the deserialized value. This
indicates a mismatch in the value serializers used by the KvState instance
and this access.
        at
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
        at
com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
        at
com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
        at
org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
        at
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
        at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
        at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)

*Flink Job's Logic*

   * FlinkKafkaConsumer09<MerchantApiEvent> consumer = new
FlinkKafkaConsumer09<>(*
*        "/apps/application-stream:flink-demo", new MerchantApiSchema(),
properties);*

*    DataStream<MerchantApiEvent> inputEventStream =
env.addSource(consumer);*

*    DataStream<Tuple3<String, String, Long>> outputStream =*
*        inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)*
*            .window(SlidingProcessingTimeWindows.of(Time.seconds(120),
Time.milliseconds(1000)))*
*            .sum(2);*

*    DataStream<Long> output = outputStream.keyBy(0).flatMap(new
CountEvent());*

*    output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);*

*    // execute program*
*    env.execute("Filter Transformation Example");*

*  }*


*  public static class CreateTuple*
*      implements MapFunction<MerchantApiEvent, Tuple3<String, String,
Long>> {*
*    @Override*
*    public Tuple3<String, String, Long> map(MerchantApiEvent input) throws
Exception {*
*      return new Tuple3<String, String, Long>(input.getMerchantId(),
input.getApiName(), 1L);*
*    }*

*  }*

*  public static class CountEvent extends
RichFlatMapFunction<Tuple3<String, String, Long>, Long> {*

*    private transient MapState<String, Long> mapState;*

*    @Override*
*    public void flatMap(Tuple3<String, String, Long> input,
Collector<Long> out) throws Exception {*

*      mapState.put(input.f1, input.f2);*

*    }*

*    @Override*
*    public void open(Configuration config) {*

*      MapStateDescriptor<String, Long> mapStateDesc = new
MapStateDescriptor<String, Long>(*
*          "mapQuery", TypeInformation.of(new TypeHint<String>() {*
*          }), TypeInformation.of(new TypeHint<Long>() {*
*          }));*
*      mapStateDesc.setQueryable("mapQuery");*

*      mapState = getRuntimeContext().getMapState(mapStateDesc);*

*    }*
*  }*


*Flink Query Client's Logic*

    *final JobID jobId = JobID.fromHexString(jobIdParam);*

*    String key = queryStateRequestDto.getKey();*

*    final Configuration config = new Configuration();*
*    config.setString(JobManagerOptions.ADDRESS, jobManagerHost);*
*    config.setInteger(JobManagerOptions.PORT, jobManagerPort);*

*    HighAvailabilityServices highAvailabilityServices = null;*
*    try {*
*      highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(*
*          config, Executors.newSingleThreadScheduledExecutor(),*
*
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*
*    } catch (Exception e) {*
*      // TODO Auto-generated catch block*
*      e.printStackTrace();*
*    }*

*    try {*
*      QueryableStateClient client = new QueryableStateClient(config,
highAvailabilityServices);*

*      final TypeSerializer<String> keySerializer = TypeInformation.of(new
TypeHint<String>() {*
*      }).createSerializer(new ExecutionConfig());*
*      final TypeSerializer<Map<String, Long>> valueSerializer =*
*          TypeInformation.of(new TypeHint<Map<String, Long>>() {*
*          }).createSerializer(new ExecutionConfig());*

*      final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(key,*
*          keySerializer, VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);*

*      scala.concurrent.Future<byte[]> serializedResult =*
*          client.getKvState(jobId, "mapQuery", key.hashCode(),
serializedKey);*

*      // now wait for the result and return it*
*      final FiniteDuration duration = new FiniteDuration(1,
TimeUnit.SECONDS);*
*      byte[] serializedValue = Await.result(serializedResult, duration);*
*      Map<String, Long> value =*
*          KvStateRequestSerializer.deserializeValue(serializedValue,
valueSerializer);*
*      System.out.println(value);*
*    } catch (Exception e) {*
*      // TODO Auto-generated catch block*
*      e.printStackTrace();*
*    }*

Reply via email to