Hi,
I am getting multiple exceptions while trying to use RocksDB as astate backend.
I have 2 Task Managers with 2 taskslots and 4 cores each.
Below is our setup:
Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) ---->
KeyedProcessFunction(4 Parallelism) ----> FlinkKafkaProducer(1Parallelism)
----> KafkaTopic
public class Aggregator_KeyedExpression extendsKeyedProcessFunction<Object,
GameZoneInput, GameZoneOutput> {
private ValueState<Integer>valueState;
@Override
public void open() throws Exception {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<Integer>(
"totalPrize",Integer.class);
valueState =getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(GameZoneInputinEvent, Context ctx, final
List<GameZoneOutput> outEvents)
throws Exception {
if(valueState.value() == null) {
valueState.update(0);
}
valueState.update(valueState.value()+ inEvent.getPrizeDelta()); ----->
NullPointerException on this line
int sum =valueState.value();
GameZoneOutputoutput = new GameZoneOutput();
output.setPlayerId(inEvent.getPlayerId());
output.setNetPrize(sum);
outEvents.add(output);
}
@Override
public void close() throws Exception {
valueState.clear();
}
}
While doing a load test, I get a NullPointerException in valueState.value().
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions and
works fine otherwise.
We also see some serialization exceptions:
Suppressed: java.lang.IllegalArgumentException: Position outof bounds.
atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)
atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)
at
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)
atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)
Any leads would be appreciated. Thanks
Chirag