Hi David, could you please explain what you are actually trying to achieve?
It seems like you are reading in the SinkFunction#open some files from S3 and put it into state (bootstrapping?) How many instances of the sink are executed? How do you shard the buckets / e.g. how do you avoid reading the same file on multiple parallel sinks? Is your sink running in a keyed context? Maybe even provide the general pipeline. On Thu, Mar 5, 2020 at 2:29 PM David Morin <morin.david....@gmail.com> wrote: > Hello Arvid, > > After some investigations with the help of my colleague we finally found > the root cause. > In order to improve the init of the state, I've created some threads to > parallelize the read of bucket files. > This is a temporary solution because I've planned to use the State > Processor API. > Here after an abstract of the code: > > > > > > > > > > > > > > > > > *ExecutorService executorService = > Executors.newFixedThreadPool(initStateMaxThreads);for (FileStatus > bucketFile : xxx) { executorService.submit( () -> { > try { readBucketFct(XXX); // Update the state with the bucket > content... } catch (Exception e) { .... } > });}executorService.shutdown();boolean terminated = > executorService.awaitTermination(initStateTimeoutSeconds, > TimeUnit.SECONDS);if ((!terminated) || (readMetaErrors.get() > 0)) { > throw new SinkException("Init state failed...") ;}* > > After some tests: if I use one thead in my executorService it works. But > with 2 threads the job fails. > Can I mitigate this behaviour (in waiting the switch to the State > Processor API) ? > > Thanks > David > > > Le jeu. 5 mars 2020 à 08:06, Arvid Heise <ar...@ververica.com> a écrit : > >> Hi David, >> >> the obvious reason is that your state stored an enum value that is not >> present anymore. It tries to deserialize the 512. entry in your enum that >> is not available. >> >> However, since it's highly unlikely that you actually have that many enum >> values in the same enum class, we are actually looking at a corrupt stream, >> which is hard to fix. Could you describe which state you have? >> >> Did you upgrade Flink or your application? If it's Flink, it's a bug. If >> it's application, it may be that state is incompatible and would need to be >> migrated. >> >> Did you restart from checkpoint or savepoint? >> >> On Thu, Mar 5, 2020 at 1:14 AM David Morin <morin.david....@gmail.com> >> wrote: >> >>> Hello, >>> >>> I have this Exception in my datastream app and I can't find the root >>> cause. >>> I consume data from Kafka and it fails when I try to get a value from my >>> MapState in RocksDB. >>> It was working in previous release of my app but I can't find the cause >>> of this error. >>> >>> java.lang.ArrayIndexOutOfBoundsException: 512 >>> at >>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130) >>> at >>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123) >>> at >>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) >>> .. >>> >>> Flink version: 1.9.2 >>> >>> >>>