Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6173#discussion_r195903638 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1303,103 +1316,18 @@ private ColumnFamilyHandle createColumnFamily(String stateName) throws IOExcepti } @Override - protected <N, T> InternalValueState<K, N, T> createValueState( - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<T> stateDesc) throws Exception { - - Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBValueState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - this); - } - - @Override - protected <N, T> InternalListState<K, N, T> createListState( - TypeSerializer<N> namespaceSerializer, - ListStateDescriptor<T> stateDesc) throws Exception { - - Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBListState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getElementSerializer(), - this); - } - - @Override - protected <N, T> InternalReducingState<K, N, T> createReducingState( - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<T> stateDesc) throws Exception { - - Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBReducingState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getReduceFunction(), - this); - } - - @Override - protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState( - TypeSerializer<N> namespaceSerializer, - AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception { - - Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBAggregatingState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getAggregateFunction(), - this); - } - - @Override - protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState( + public <N, SV, S extends State, IS extends S> IS createState( TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { - - Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBFoldingState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getFoldFunction(), - this); - } - - @Override - protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState( - TypeSerializer<N> namespaceSerializer, - MapStateDescriptor<UK, UV> stateDesc) throws Exception { - - Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBMapState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - this); + StateDescriptor<S, SV> stateDesc) throws Exception { + if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) { + String message = String.format("State %s is not supported by %s", + stateDesc.getClass(), this.getClass()); + throw new UnsupportedOperationException(message); --- End diff -- The exception type is a bit inconsistent, in other place throw `FlinkRuntimeException`, maybe it better to make this consistent.
---