Hi Chirag,

Which Flink version are you using? As far as I understand, the issue is
appearing just by writing the initial data - no recovery happened right?

Could you try to change the code such that you only have a single
read/update on the state? It should work as you have done it but I'd like
to pinpoint the issue further.

On Thu, Jun 10, 2021 at 8:25 AM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Chirag,
>
> Logically Integer type should not have this issue. Sorry that from the
> current description I
> have not found other issues, could you also share the code in the main
> method that
> adds the KeyProcessFunction into the job ? Very thanks!
>
> Best,
> Yun
>
> ------------------------------------------------------------------
> From:Chirag Dewan <chirag.dewa...@yahoo.in>
> Send Time:2021 Jun. 9 (Wed.) 15:15
> To:User <user@flink.apache.org>; Yun Gao <yungao...@aliyun.com>
> Subject:Re: Multiple Exceptions during Load Test in State Access APIs with
> RocksDB
>
> Thanks for the reply Yun.
>
> The key is an Integer type. Do you think there can be hash collisions for
> Integers?
>
> It somehow works on single TM now. No errors for 1m records.
> But as soon as we move to 2 TMs, we get all sort of errors - 'Position Out
> of Bound', key not in Keygroup etc.
>
> This also causes a NPE in the user defined code -
>
> if (valueState != null)
>     valueState.value() -> This causes Null, so while the if check passed,
> it caused an NPE while reading the value.
>
> Thanks,
> Chirag
>
> On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao <yungao...@aliyun.com>
> wrote:
>
>
> Hi Chirag,
>
> As far as I know, If you are running a single job, I think all th pods
> share the same
> state.checkpoints.dir configuration should be as expected, and it is not
> necessary
> to configuraiton the rocksdb local dir since Flink will chosen a default
> dir.
>
> Regarding the latest exception, I think you might first check the key type
> used and
> the key type should has a stable hashcode method.
>
> Best,
> Yun
>
>
>
> ------------------Original Mail ------------------
> *Sender:*Chirag Dewan <chirag.dewa...@yahoo.in>
> *Send Date:*Tue Jun 8 18:06:07 2021
> *Recipients:*User <user@flink.apache.org>, Yun Gao <yungao...@aliyun.com>
> *Subject:*Re: Multiple Exceptions during Load Test in State Access APIs
> with RocksDB
> Hi,
>
> Although this looks like a problem to me, I still cant conclude it.
>
> I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each.
> I was hoping that with single TM there will be file write conflicts. But
> that doesn't seem to be the case as still get the:
>
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.lang.IllegalArgumentException: Key group 2 is not in
> KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
>
> I have checked that there's no concurrent access on the ValueState.
>
> Any more leads?
>
> Thanks,
> Chirag
>
>
> On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan <
> chirag.dewa...@yahoo.in> wrote:
>
>
> Hi,
>
> I think I got my issue. Would help if someone can confirm it :)
>
> I am using a NFS filesystem for storing my checkpoints and my Flink
> cluster is running on a K8 with 2 TMs and 2 JMs.
>
> All my pods share the NFS PVC with state.checkpoint.dir and we also missed
> setting the RocksDB local dir.
>
> Does this lead to state corruption?
>
> Thanks,
> Chirag
>
>
>
> On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan <
> chirag.dewa...@yahoo.in> wrote:
>
>
> Thanks for the reply Yun. I strangely don't see any nulls. And infact this
> exception comes on the first few records and then job starts processing
> normally.
>
> Also, I don't see any reason for Concurrent access to the state in my
> code. Could more CPU cores than task slots to the Task Manager be the
> reason for it?
>
> On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao <yungao...@aliyun.com>
> wrote:
>
>
> Hi Chirag,
>
> If be able to produce the exception, could you first add some logs to print
> the value of valueState, valueState.value(), inEvent and
> inEvent.getPriceDelta() ?
> I think either object being null would cause NullPointerException here.
>
> For the second exception, I found a similar issue[1], caused by concurrent
> access to the value state. Do we have the similar situation here ?
>
> Best,
> Yun
>
> [1] https://issues.apache.org/jira/browse/FLINK-18587
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Chirag Dewan <chirag.dewa...@yahoo.in>
> *Send Date:*Sat Jun 5 20:29:37 2021
> *Recipients:*User <user@flink.apache.org>
> *Subject:*Multiple Exceptions during Load Test in State Access APIs with
> RocksDB
> Hi,
>
> I am getting multiple exceptions while trying to use RocksDB as astate
> backend.
>
> I have 2 Task Managers with 2 taskslots and 4 cores each.
>
> Below is our setup:
>
>
>
> Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) ---->
> KeyedProcessFunction(4 Parallelism) ----> FlinkKafkaProducer(1Parallelism)
> ----> KafkaTopic
>
>
>
> public class Aggregator_KeyedExpression
> extendsKeyedProcessFunction<Object, GameZoneInput, GameZoneOutput> {
>
>
>
>     private ValueState<Integer>valueState;
>
>
>
>     @Override
>
>     public void open() throws Exception {
>
>                             ValueStateDescriptor<Integer> descriptor =
>
>            new ValueStateDescriptor<Integer>(
>
>            "totalPrize",Integer.class);
>
>
>
>         valueState =getRuntimeContext().getState(descriptor);
>
>     }
>
>
>
>     @Override
>
>     public void processElement(GameZoneInputinEvent, Context ctx, final
> List<GameZoneOutput> outEvents)
>
>            throws Exception {
>
>
>
>         if(valueState.value() == null) {
>
>            valueState.update(0);
>
>         }
>
>
>
>         valueState.update(valueState.value()+ inEvent.getPrizeDelta());
> -----> NullPointerException on this line
>
>
>
>         int sum =valueState.value();
>
>
>
>         GameZoneOutputoutput = new GameZoneOutput();
>
>        output.setPlayerId(inEvent.getPlayerId());
>
>        output.setNetPrize(sum);
>
>        outEvents.add(output);
>
>
>
>     }
>
>
>
>     @Override
>
>     public void close() throws Exception {
>
>        valueState.clear();
>
>     }
>
> }
>  While doing a load test, I get a NullPointerException in
> valueState.value(). Which seems strange as we would have updated the value
> state above.
>
> Another strange thing is that this is observed only in load conditions and
> works fine otherwise.
>
> We also see some serialization exceptions:
>
> Suppressed: java.lang.IllegalArgumentException: Position outof bounds.
>
> atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>
> at
> org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)
>
>
> atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)
>
>
> atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
> at
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)
>
>
> Any leads would be appreciated. Thanks
>
> Chirag
>
>
>
>

Reply via email to