Hi David,

could you please explain what you are actually trying to achieve?

It seems like you are reading in the SinkFunction#open some files from S3
and put it into state (bootstrapping?)
How many instances of the sink are executed?
How do you shard the buckets / e.g. how do you avoid reading the same file
on multiple parallel sinks?
Is your sink running in a keyed context? Maybe even provide the general
pipeline.

On Thu, Mar 5, 2020 at 2:29 PM David Morin <morin.david....@gmail.com>
wrote:

> Hello Arvid,
>
> After some investigations with the help of my colleague we finally found
> the root cause.
> In order to improve the init of the state, I've created some threads to
> parallelize the read of bucket files.
> This is a temporary solution because I've planned to use the State
> Processor API.
> Here after an abstract of the code:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *ExecutorService executorService =
> Executors.newFixedThreadPool(initStateMaxThreads);for (FileStatus
> bucketFile : xxx) {    executorService.submit(            () -> {
> try {            readBucketFct(XXX); // Update the state with the bucket
> content...        } catch (Exception e) {           ....        }
> });}executorService.shutdown();boolean terminated =
> executorService.awaitTermination(initStateTimeoutSeconds,
> TimeUnit.SECONDS);if ((!terminated) || (readMetaErrors.get() > 0)) {
> throw new SinkException("Init state failed...") ;}*
>
> After some tests: if I use one thead in my executorService it works. But
> with 2 threads the job fails.
> Can I mitigate this behaviour (in waiting the switch to the State
> Processor API) ?
>
> Thanks
> David
>
>
> Le jeu. 5 mars 2020 à 08:06, Arvid Heise <ar...@ververica.com> a écrit :
>
>> Hi David,
>>
>> the obvious reason is that your state stored an enum value that is not
>> present anymore. It tries to deserialize the 512. entry in your enum that
>> is not available.
>>
>> However, since it's highly unlikely that you actually have that many enum
>> values in the same enum class, we are actually looking at a corrupt stream,
>> which is hard to fix. Could you describe which state you have?
>>
>> Did you upgrade Flink or your application? If it's Flink, it's a bug. If
>> it's application, it may be that state is incompatible and would need to be
>> migrated.
>>
>> Did you restart from checkpoint or savepoint?
>>
>> On Thu, Mar 5, 2020 at 1:14 AM David Morin <morin.david....@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I have this Exception in my datastream app and I can't find the root
>>> cause.
>>> I consume data from Kafka and it fails when I try to get a value from my
>>> MapState in RocksDB.
>>> It was working in previous release of my app but I can't find the cause
>>> of this error.
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: 512
>>>         at
>>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
>>>         at
>>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
>>>         at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
>>>         at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
>>>         at
>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>> ..
>>>
>>> Flink version: 1.9.2
>>>
>>>
>>>

Reply via email to