Hi Chirag, Which Flink version are you using? As far as I understand, the issue is appearing just by writing the initial data - no recovery happened right?
Could you try to change the code such that you only have a single read/update on the state? It should work as you have done it but I'd like to pinpoint the issue further. On Thu, Jun 10, 2021 at 8:25 AM Yun Gao <yungao...@aliyun.com> wrote: > Hi Chirag, > > Logically Integer type should not have this issue. Sorry that from the > current description I > have not found other issues, could you also share the code in the main > method that > adds the KeyProcessFunction into the job ? Very thanks! > > Best, > Yun > > ------------------------------------------------------------------ > From:Chirag Dewan <chirag.dewa...@yahoo.in> > Send Time:2021 Jun. 9 (Wed.) 15:15 > To:User <user@flink.apache.org>; Yun Gao <yungao...@aliyun.com> > Subject:Re: Multiple Exceptions during Load Test in State Access APIs with > RocksDB > > Thanks for the reply Yun. > > The key is an Integer type. Do you think there can be hash collisions for > Integers? > > It somehow works on single TM now. No errors for 1m records. > But as soon as we move to 2 TMs, we get all sort of errors - 'Position Out > of Bound', key not in Keygroup etc. > > This also causes a NPE in the user defined code - > > if (valueState != null) > valueState.value() -> This causes Null, so while the if check passed, > it caused an NPE while reading the value. > > Thanks, > Chirag > > On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao <yungao...@aliyun.com> > wrote: > > > Hi Chirag, > > As far as I know, If you are running a single job, I think all th pods > share the same > state.checkpoints.dir configuration should be as expected, and it is not > necessary > to configuraiton the rocksdb local dir since Flink will chosen a default > dir. > > Regarding the latest exception, I think you might first check the key type > used and > the key type should has a stable hashcode method. > > Best, > Yun > > > > ------------------Original Mail ------------------ > *Sender:*Chirag Dewan <chirag.dewa...@yahoo.in> > *Send Date:*Tue Jun 8 18:06:07 2021 > *Recipients:*User <user@flink.apache.org>, Yun Gao <yungao...@aliyun.com> > *Subject:*Re: Multiple Exceptions during Load Test in State Access APIs > with RocksDB > Hi, > > Although this looks like a problem to me, I still cant conclude it. > > I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. > I was hoping that with single TM there will be file write conflicts. But > that doesn't seem to be the case as still get the: > > Caused by: org.apache.flink.util.SerializedThrowable: > java.lang.IllegalArgumentException: Key group 2 is not in > KeyGroupRange{startKeyGroup=64, endKeyGroup=95}. > > I have checked that there's no concurrent access on the ValueState. > > Any more leads? > > Thanks, > Chirag > > > On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan < > chirag.dewa...@yahoo.in> wrote: > > > Hi, > > I think I got my issue. Would help if someone can confirm it :) > > I am using a NFS filesystem for storing my checkpoints and my Flink > cluster is running on a K8 with 2 TMs and 2 JMs. > > All my pods share the NFS PVC with state.checkpoint.dir and we also missed > setting the RocksDB local dir. > > Does this lead to state corruption? > > Thanks, > Chirag > > > > On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan < > chirag.dewa...@yahoo.in> wrote: > > > Thanks for the reply Yun. I strangely don't see any nulls. And infact this > exception comes on the first few records and then job starts processing > normally. > > Also, I don't see any reason for Concurrent access to the state in my > code. Could more CPU cores than task slots to the Task Manager be the > reason for it? > > On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao <yungao...@aliyun.com> > wrote: > > > Hi Chirag, > > If be able to produce the exception, could you first add some logs to print > the value of valueState, valueState.value(), inEvent and > inEvent.getPriceDelta() ? > I think either object being null would cause NullPointerException here. > > For the second exception, I found a similar issue[1], caused by concurrent > access to the value state. Do we have the similar situation here ? > > Best, > Yun > > [1] https://issues.apache.org/jira/browse/FLINK-18587 > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Chirag Dewan <chirag.dewa...@yahoo.in> > *Send Date:*Sat Jun 5 20:29:37 2021 > *Recipients:*User <user@flink.apache.org> > *Subject:*Multiple Exceptions during Load Test in State Access APIs with > RocksDB > Hi, > > I am getting multiple exceptions while trying to use RocksDB as astate > backend. > > I have 2 Task Managers with 2 taskslots and 4 cores each. > > Below is our setup: > > > > Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) ----> > KeyedProcessFunction(4 Parallelism) ----> FlinkKafkaProducer(1Parallelism) > ----> KafkaTopic > > > > public class Aggregator_KeyedExpression > extendsKeyedProcessFunction<Object, GameZoneInput, GameZoneOutput> { > > > > private ValueState<Integer>valueState; > > > > @Override > > public void open() throws Exception { > > ValueStateDescriptor<Integer> descriptor = > > new ValueStateDescriptor<Integer>( > > "totalPrize",Integer.class); > > > > valueState =getRuntimeContext().getState(descriptor); > > } > > > > @Override > > public void processElement(GameZoneInputinEvent, Context ctx, final > List<GameZoneOutput> outEvents) > > throws Exception { > > > > if(valueState.value() == null) { > > valueState.update(0); > > } > > > > valueState.update(valueState.value()+ inEvent.getPrizeDelta()); > -----> NullPointerException on this line > > > > int sum =valueState.value(); > > > > GameZoneOutputoutput = new GameZoneOutput(); > > output.setPlayerId(inEvent.getPlayerId()); > > output.setNetPrize(sum); > > outEvents.add(output); > > > > } > > > > @Override > > public void close() throws Exception { > > valueState.clear(); > > } > > } > While doing a load test, I get a NullPointerException in > valueState.value(). Which seems strange as we would have updated the value > state above. > > Another strange thing is that this is observed only in load conditions and > works fine otherwise. > > We also see some serialization exceptions: > > Suppressed: java.lang.IllegalArgumentException: Position outof bounds. > > atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > > at > org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352) > > > atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185) > > at > org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114) > > > atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113) > > > Any leads would be appreciated. Thanks > > Chirag > > > >