Hello Arvid,

After some investigations with the help of my colleague we finally found
the root cause.
In order to improve the init of the state, I've created some threads to
parallelize the read of bucket files.
This is a temporary solution because I've planned to use the State
Processor API.
Here after an abstract of the code:
















*ExecutorService executorService =
Executors.newFixedThreadPool(initStateMaxThreads);for (FileStatus
bucketFile : xxx) {    executorService.submit(            () -> {
try {            readBucketFct(XXX); // Update the state with the bucket
content...        } catch (Exception e) {           ....        }
});}executorService.shutdown();boolean terminated =
executorService.awaitTermination(initStateTimeoutSeconds,
TimeUnit.SECONDS);if ((!terminated) || (readMetaErrors.get() > 0)) {
throw new SinkException("Init state failed...") ;}*

After some tests: if I use one thead in my executorService it works. But
with 2 threads the job fails.
Can I mitigate this behaviour (in waiting the switch to the State Processor
API) ?

Thanks
David


Le jeu. 5 mars 2020 à 08:06, Arvid Heise <ar...@ververica.com> a écrit :

> Hi David,
>
> the obvious reason is that your state stored an enum value that is not
> present anymore. It tries to deserialize the 512. entry in your enum that
> is not available.
>
> However, since it's highly unlikely that you actually have that many enum
> values in the same enum class, we are actually looking at a corrupt stream,
> which is hard to fix. Could you describe which state you have?
>
> Did you upgrade Flink or your application? If it's Flink, it's a bug. If
> it's application, it may be that state is incompatible and would need to be
> migrated.
>
> Did you restart from checkpoint or savepoint?
>
> On Thu, Mar 5, 2020 at 1:14 AM David Morin <morin.david....@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have this Exception in my datastream app and I can't find the root
>> cause.
>> I consume data from Kafka and it fails when I try to get a value from my
>> MapState in RocksDB.
>> It was working in previous release of my app but I can't find the cause
>> of this error.
>>
>> java.lang.ArrayIndexOutOfBoundsException: 512
>>         at
>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
>>         at
>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
>>         at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
>>         at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
>>         at
>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>> ..
>>
>> Flink version: 1.9.2
>>
>>
>>

Reply via email to