[ https://issues.apache.org/jira/browse/FLINK-20772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799718#comment-17799718 ]
Zakelly Lan commented on FLINK-20772: ------------------------------------- [~dorbae] Glad you are back! And sorry for my hurry. Hope you have the opportunity to contribute next time. :) > RocksDBValueState with TTL occurs NullPointerException when calling > update(null) method > ---------------------------------------------------------------------------------------- > > Key: FLINK-20772 > URL: https://issues.apache.org/jira/browse/FLINK-20772 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.11.2 > Environment: Flink version: 1.11.2 > Flink Cluster: Standalone cluster with 3 Job managers and Task managers on > CentOS 7 > Reporter: Seongbae Chang > Assignee: Zakelly Lan > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > beginner, pull-request-available > Fix For: 1.19.0 > > > h2. Problem > * I use ValueState for my custom trigger and set TTL for these ValueState in > RocksDB backend environment. > * I found an error when I used this code. I know that > ValueState.update(null) works equally to ValueState.clear() in general. > Unfortunately, this error occurs after using TTL > {code:java} > // My Code > ctx.getPartitionedState(batchTotalSizeStateDesc).update(null); > {code} > * I tested this in Flink 1.11.2, but I think it would be a problem in upper > versions. > * Plus, I'm a beginner. So, if there is any problem in this discussion > issue, please give me advice about that. And I'll fix it! > {code:java} > // Error Stacktrace > Caused by: TimerException{org.apache.flink.util.FlinkRuntimeException: Error > while adding data to RocksDB} > ... 12 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Error while adding > data to RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108) > at > org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50) > at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:102) > at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:29) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onProcessingTime(WindowOperator.java:902) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220) > ... 11 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:69) > at > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32) > at > org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167) > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106) > ... 18 more > {code} > > h2. Reason > * It relates to RocksDBValueState with TTLValueState > * In RocksDBValueState(as well as other types of ValueState), > *.update(null)* has to be caught in if-clauses(null checking). However, it > skips the null checking and then tries to serialize the null value. > {code:java} > // > https://github.com/apache/flink/blob/release-1.11/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L96-L110 > @Override > public void update(V value) { > if (value == null) { > clear(); > return; > } > > try { > backend.db.put(columnFamily, writeOptions, > serializeCurrentKeyWithGroupAndNamespace(), serializeValue(value)); > } catch (Exception e) { > throw new FlinkRuntimeException("Error while adding data to RocksDB", > e); > } > }{code} > * It is because that TtlValueState wraps the value(null) with the > LastAccessTime and makes the new TtlValue Object with the null value. > {code:java} > // > https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L47-L51 > @Override > public void update(T value) throws IOException { > accessCallback.run(); > original.update(wrapWithTs(value)); > } > {code} > {code:java} > // > https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java#L46-L48 > static <V> TtlValue<V> wrapWithTs(V value, long ts) { > return new TtlValue<>(value, ts); > }{code} > {code:java} > // > https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java > public class TtlValue<T> implements Serializable { > private static final long serialVersionUID = 5221129704201125020L; > @Nullable > private final T userValue; > private final long lastAccessTimestamp; > public TtlValue(@Nullable T userValue, long lastAccessTimestamp) { > this.userValue = userValue; > this.lastAccessTimestamp = lastAccessTimestamp; > } > @Nullable > public T getUserValue() { > return userValue; > } > public long getLastAccessTimestamp() { > return lastAccessTimestamp; > } > } > {code} > * In conclusion, I think that null checking logic has to be changed for > checking whether userValue variable in TtlValue is null or not > > I hope that it would be helpful to improve Flink and if I have a chance, I > want to fix it! > Thank you and have a happy Christmas all! -- This message was sent by Atlassian Jira (v8.20.10#820010)