Hi,
I think I got my issue. Would help if someone can confirm it :)
I am using a NFS filesystem for storing my checkpoints and my Flink cluster is
running on a K8 with 2 TMs and 2 JMs.
All my pods share the NFS PVC with state.checkpoint.dir and we also missed
setting the RocksDB local dir.
Does this lead to state corruption?
Thanks,Chirag
On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan
<[email protected]> wrote:
Thanks for the reply Yun. I strangely don't see any nulls. And infact this
exception comes on the first few records and then job starts processing
normally.
Also, I don't see any reason for Concurrent access to the state in my code.
Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao <[email protected]>
wrote:
Hi Chirag,
If be able to produce the exception, could you first add some logs to printthe
value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ?I
think either object being null would cause NullPointerException here.
For the second exception, I found a similar issue[1], caused by concurrent
access to the value state. Do we have the similar situation here ?
Best,Yun
[1] https://issues.apache.org/jira/browse/FLINK-18587
Best,Yun
------------------Original Mail ------------------Sender:Chirag Dewan
<[email protected]>Send Date:Sat Jun 5 20:29:37 2021Recipients:User
<[email protected]>Subject:Multiple Exceptions during Load Test in State
Access APIs with RocksDB
Hi,
I am getting multiple exceptions while trying to use RocksDB as astate backend.
I have 2 Task Managers with 2 taskslots and 4 cores each.
Below is our setup:
Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) ---->
KeyedProcessFunction(4 Parallelism) ----> FlinkKafkaProducer(1Parallelism)
----> KafkaTopic
public class Aggregator_KeyedExpression extendsKeyedProcessFunction<Object,
GameZoneInput, GameZoneOutput> {
private ValueState<Integer>valueState;
@Override
public void open() throws Exception {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<Integer>(
"totalPrize",Integer.class);
valueState =getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(GameZoneInputinEvent, Context ctx, final
List<GameZoneOutput> outEvents)
throws Exception {
if(valueState.value() == null) {
valueState.update(0);
}
valueState.update(valueState.value()+ inEvent.getPrizeDelta()); ----->
NullPointerException on this line
int sum =valueState.value();
GameZoneOutputoutput = new GameZoneOutput();
output.setPlayerId(inEvent.getPlayerId());
output.setNetPrize(sum);
outEvents.add(output);
}
@Override
public void close() throws Exception {
valueState.clear();
}
}
While doing a load test, I get a NullPointerException in valueState.value().
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions and
works fine otherwise.
We also see some serialization exceptions:
Suppressed: java.lang.IllegalArgumentException: Position outof bounds.
atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)
atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)
at
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)
atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)
Any leads would be appreciated. Thanks
Chirag