Hi, I am trying to query Flink's MapState from Flink client (1.3.2). I was able to query ValueState but when I tried to query MapState I am getting an exception.
java.io.IOException: Unconsumed bytes in the deserialized value. This indicates a mismatch in the value serializers used by the KvState instance and this access. at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438) at com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81) at com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) *Flink Job's Logic* * FlinkKafkaConsumer09<MerchantApiEvent> consumer = new FlinkKafkaConsumer09<>(* * "/apps/application-stream:flink-demo", new MerchantApiSchema(), properties);* * DataStream<MerchantApiEvent> inputEventStream = env.addSource(consumer);* * DataStream<Tuple3<String, String, Long>> outputStream =* * inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)* * .window(SlidingProcessingTimeWindows.of(Time.seconds(120), Time.milliseconds(1000)))* * .sum(2);* * DataStream<Long> output = outputStream.keyBy(0).flatMap(new CountEvent());* * output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);* * // execute program* * env.execute("Filter Transformation Example");* * }* * public static class CreateTuple* * implements MapFunction<MerchantApiEvent, Tuple3<String, String, Long>> {* * @Override* * public Tuple3<String, String, Long> map(MerchantApiEvent input) throws Exception {* * return new Tuple3<String, String, Long>(input.getMerchantId(), input.getApiName(), 1L);* * }* * }* * public static class CountEvent extends RichFlatMapFunction<Tuple3<String, String, Long>, Long> {* * private transient MapState<String, Long> mapState;* * @Override* * public void flatMap(Tuple3<String, String, Long> input, Collector<Long> out) throws Exception {* * mapState.put(input.f1, input.f2);* * }* * @Override* * public void open(Configuration config) {* * MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<String, Long>(* * "mapQuery", TypeInformation.of(new TypeHint<String>() {* * }), TypeInformation.of(new TypeHint<Long>() {* * }));* * mapStateDesc.setQueryable("mapQuery");* * mapState = getRuntimeContext().getMapState(mapStateDesc);* * }* * }* *Flink Query Client's Logic* *final JobID jobId = JobID.fromHexString(jobIdParam);* * String key = queryStateRequestDto.getKey();* * final Configuration config = new Configuration();* * config.setString(JobManagerOptions.ADDRESS, jobManagerHost);* * config.setInteger(JobManagerOptions.PORT, jobManagerPort);* * HighAvailabilityServices highAvailabilityServices = null;* * try {* * highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(* * config, Executors.newSingleThreadScheduledExecutor(),* * HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);* * } catch (Exception e) {* * // TODO Auto-generated catch block* * e.printStackTrace();* * }* * try {* * QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);* * final TypeSerializer<String> keySerializer = TypeInformation.of(new TypeHint<String>() {* * }).createSerializer(new ExecutionConfig());* * final TypeSerializer<Map<String, Long>> valueSerializer =* * TypeInformation.of(new TypeHint<Map<String, Long>>() {* * }).createSerializer(new ExecutionConfig());* * final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(key,* * keySerializer, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);* * scala.concurrent.Future<byte[]> serializedResult =* * client.getKvState(jobId, "mapQuery", key.hashCode(), serializedKey);* * // now wait for the result and return it* * final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);* * byte[] serializedValue = Await.result(serializedResult, duration);* * Map<String, Long> value =* * KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);* * System.out.println(value);* * } catch (Exception e) {* * // TODO Auto-generated catch block* * e.printStackTrace();* * }*