aljoscha commented on a change in pull request #14648: URL: https://github.com/apache/flink/pull/14648#discussion_r567776152
########## File path: flink-tests/src/test/java/org/apache/flink/test/state/SavepointStateBackendSwitchTest.java ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.contrib.streaming.state.RocksDBTestUtils; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.heap.HeapStateBackendTestBase; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer; +import org.apache.flink.streaming.api.operators.TimerSerializer; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.RunnableFuture; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Tests for the unified savepoint format. They verify you can switch a state backend through a + * savepoint. + */ +public class SavepointStateBackendSwitchTest { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void switchFromRocksToHeap() throws Exception { + + final File pathToWrite = tempFolder.newFile("tmp_rocks_map_state"); + + final MapStateDescriptor<Long, Long> mapStateDescriptor = + new MapStateDescriptor<>("my-map-state", Long.class, Long.class); + mapStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); + + final ValueStateDescriptor<Long> valueStateDescriptor = + new ValueStateDescriptor<>("my-value-state", Long.class); + valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); + + ListStateDescriptor<Long> listStateDescriptor = + new ListStateDescriptor<>("my-list-state", Long.class); + listStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); + + final Integer namespace1 = 1; + final Integer namespace2 = 2; + final Integer namespace3 = 3; + + takeRocksSavepoint( + pathToWrite, + mapStateDescriptor, + valueStateDescriptor, + listStateDescriptor, + namespace1, + namespace2, + namespace3); + + final SnapshotResult<KeyedStateHandle> stateHandles; + try (BufferedInputStream bis = + new BufferedInputStream((new FileInputStream(pathToWrite)))) { + stateHandles = + InstantiationUtil.deserializeObject( + bis, Thread.currentThread().getContextClassLoader()); + } + final KeyedStateHandle stateHandle = stateHandles.getJobManagerOwnedSnapshot(); + try (final HeapKeyedStateBackend<String> keyedBackend = + createHeapKeyedStateBackend(StateObjectCollection.singleton(stateHandle))) { + + InternalMapState<String, Integer, Long, Long> state = + keyedBackend.createInternalState(IntSerializer.INSTANCE, mapStateDescriptor); + + InternalValueState<String, Integer, Long> valueState = + keyedBackend.createInternalState(IntSerializer.INSTANCE, valueStateDescriptor); + + InternalListState<String, Integer, Long> listState = + keyedBackend.createInternalState(IntSerializer.INSTANCE, listStateDescriptor); + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + assertEquals(33L, (long) state.get(33L)); + assertEquals(55L, (long) state.get(55L)); + assertEquals(2, getStateSize(state)); + + state.setCurrentNamespace(namespace2); + assertEquals(22L, (long) state.get(22L)); + assertEquals(11L, (long) state.get(11L)); + assertEquals(2, getStateSize(state)); + listState.setCurrentNamespace(namespace2); + assertThat(listState.get(), contains(4L, 5L, 6L)); + + state.setCurrentNamespace(namespace3); + assertEquals(44L, (long) state.get(44L)); + assertEquals(1, getStateSize(state)); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace3); + assertEquals(11L, (long) state.get(11L)); + assertEquals(22L, (long) state.get(22L)); + assertEquals(33L, (long) state.get(33L)); + assertEquals(44L, (long) state.get(44L)); + assertEquals(55L, (long) state.get(55L)); + assertEquals(5, getStateSize(state)); + valueState.setCurrentNamespace(namespace3); + assertEquals(1239L, (long) valueState.value()); + listState.setCurrentNamespace(namespace3); + assertThat(listState.get(), contains(1L, 2L, 3L)); + + KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<String, Integer>> priorityQueue = + keyedBackend.create( + "event-time", + new TimerSerializer<>( + keyedBackend.getKeySerializer(), IntSerializer.INSTANCE)); + + assertThat(priorityQueue.size(), equalTo(1)); + assertThat( + priorityQueue.poll(), + equalTo(new TimerHeapInternalTimer<>(1234L, "mno", namespace3))); + } + } + + private HeapKeyedStateBackend<String> createHeapKeyedStateBackend( + Collection<KeyedStateHandle> stateHandles) throws BackendBuildingException { + // must be synchronized with the KeyGroup of the RocksDB state backend + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1); + final int numKeyGroups = keyGroupRange.getNumberOfKeyGroups(); + ExecutionConfig executionConfig = new ExecutionConfig(); + + return new HeapKeyedStateBackendBuilder<>( + mock(TaskKvStateRegistry.class), + StringSerializer.INSTANCE, + HeapStateBackendTestBase.class.getClassLoader(), + numKeyGroups, + keyGroupRange, + executionConfig, + TtlTimeProvider.DEFAULT, + stateHandles, + AbstractStateBackend.getCompressionDecorator(executionConfig), + TestLocalRecoveryConfig.disabled(), + new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128), + true, + new CloseableRegistry()) + .build(); + } + + private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) + throws Exception { + int i = 0; + Iterator<Map.Entry<UK, UV>> itt = mapState.iterator(); + while (itt.hasNext()) { + i++; + itt.next(); + } + return i; + } + + private void takeRocksSavepoint( + File pathToWrite, + MapStateDescriptor<Long, Long> stateDescr, + ValueStateDescriptor<Long> valueStateDescriptor, + ListStateDescriptor<Long> listStateDescriptor, + Integer namespace1, + Integer namespace2, + Integer namespace3) + throws Exception { + try (final RocksDBKeyedStateBackend<String> keyedBackend = + RocksDBTestUtils.builderForTestDefaults( + tempFolder.newFolder(), + StringSerializer.INSTANCE, + RocksDBStateBackend.PriorityQueueStateType.ROCKSDB) + .build()) { + InternalMapState<String, Integer, Long, Long> mapState = + keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr); + + InternalValueState<String, Integer, Long> valueState = + keyedBackend.createInternalState(IntSerializer.INSTANCE, valueStateDescriptor); + + InternalListState<String, Integer, Long> listState = + keyedBackend.createInternalState(IntSerializer.INSTANCE, listStateDescriptor); + + keyedBackend.setCurrentKey("abc"); + mapState.setCurrentNamespace(namespace1); + mapState.put(33L, 33L); + mapState.put(55L, 55L); + + mapState.setCurrentNamespace(namespace2); + mapState.put(22L, 22L); + mapState.put(11L, 11L); + listState.setCurrentNamespace(namespace2); + listState.add(4L); + listState.add(5L); + listState.add(6L); + + mapState.setCurrentNamespace(namespace3); + mapState.put(44L, 44L); + + keyedBackend.setCurrentKey("mno"); + mapState.setCurrentNamespace(namespace3); + mapState.put(11L, 11L); + mapState.put(22L, 22L); + mapState.put(33L, 33L); + mapState.put(44L, 44L); + mapState.put(55L, 55L); + valueState.setCurrentNamespace(namespace3); + valueState.update(1239L); + listState.setCurrentNamespace(namespace3); + listState.add(1L); + listState.add(2L); + listState.add(3L); + + KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<String, Integer>> priorityQueue = + keyedBackend.create( + "event-time", + new TimerSerializer<>( + keyedBackend.getKeySerializer(), IntSerializer.INSTANCE)); + priorityQueue.add(new TimerHeapInternalTimer<>(1234L, "mno", namespace3)); Review comment: Should we maybe have more than one timer? ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java ########## @@ -124,129 +96,63 @@ public RocksDBFullRestoreOperation( writeBufferManagerCapacity); checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative."); this.writeBatchSize = writeBatchSize; + this.savepointRestoreOperation = + new SavepointRestoreOperation<>( + keyGroupRange, + cancelStreamRegistry, + userCodeClassLoader, + restoreStateHandles, + keySerializerProvider); } /** Restores all key-groups data that is referenced by the passed state handles. */ @Override public RocksDBRestoreResult restore() throws IOException, StateMigrationException, RocksDBException { openDB(); - for (KeyedStateHandle keyedStateHandle : restoreStateHandles) { - if (keyedStateHandle != null) { - - if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { - throw unexpectedStateHandleException( - KeyGroupsStateHandle.class, keyedStateHandle.getClass()); - } - this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; - restoreKeyGroupsInStateHandle(); - } + try (ThrowingIterator<SavepointRestoreResult> restore = + savepointRestoreOperation.restore()) { + applyRestoreResult(restore.next()); } return new RocksDBRestoreResult( this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1, null, null); } - /** Restore one key groups state handle. */ - private void restoreKeyGroupsInStateHandle() - throws IOException, StateMigrationException, RocksDBException { - try { - logger.info("Starting to restore from state handle: {}.", currentKeyGroupsStateHandle); - currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); - cancelStreamRegistry.registerCloseable(currentStateHandleInStream); - currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); - restoreKVStateMetaData(); - restoreKVStateData(); - logger.info("Finished restoring from state handle: {}.", currentKeyGroupsStateHandle); - } finally { - if (cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { - IOUtils.closeQuietly(currentStateHandleInStream); - } - } - } - - /** - * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current - * state handle. - */ - private void restoreKVStateMetaData() throws IOException, StateMigrationException { - KeyedBackendSerializationProxy<K> serializationProxy = - readMetaData(currentStateHandleInView); - - this.keygroupStreamCompressionDecorator = - serializationProxy.isUsingKeyGroupCompression() - ? SnappyStreamCompressionDecorator.INSTANCE - : UncompressedStreamCompressionDecorator.INSTANCE; - + private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult) + throws IOException, RocksDBException, StateMigrationException { List<StateMetaInfoSnapshot> restoredMetaInfos = - serializationProxy.getStateMetaInfoSnapshots(); - currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); - - for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) { - RocksDbKvStateInfo registeredStateCFHandle = - getOrRegisterStateColumnFamilyHandle(null, restoredMetaInfo); - currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle); - } + savepointRestoreResult.getStateMetaInfoSnapshots(); + List<ColumnFamilyHandle> columnFamilyHandles = + restoredMetaInfos.stream() + .map( + stateMetaInfoSnapshot -> { + RocksDbKvStateInfo registeredStateCFHandle = + getOrRegisterStateColumnFamilyHandle( + null, stateMetaInfoSnapshot); + return registeredStateCFHandle.columnFamilyHandle; + }) + .collect(Collectors.toList()); + restoreKVStateData(savepointRestoreResult.getRestoredKeyGroups(), columnFamilyHandles); } /** * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state * handle. */ - private void restoreKVStateData() throws IOException, RocksDBException { + private void restoreKVStateData( + ThrowingIterator<KeyGroup> keyGroups, List<ColumnFamilyHandle> columnFamilies) + throws IOException, RocksDBException, StateMigrationException { // for all key-groups in the current state handle... try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) { - for (Tuple2<Integer, Long> keyGroupOffset : - currentKeyGroupsStateHandle.getGroupRangeOffsets()) { - int keyGroup = keyGroupOffset.f0; - - // Check that restored key groups all belong to the backend - Preconditions.checkState( - keyGroupRange.contains(keyGroup), - "The key group must belong to the backend"); - - long offset = keyGroupOffset.f1; - // not empty key-group? - if (0L != offset) { - currentStateHandleInStream.seek(offset); - try (InputStream compressedKgIn = - keygroupStreamCompressionDecorator.decorateWithCompression( - currentStateHandleInStream)) { - DataInputViewStreamWrapper compressedKgInputView = - new DataInputViewStreamWrapper(compressedKgIn); - // TODO this could be aware of keyGroupPrefixBytes and write only one byte - // if possible - int kvStateId = compressedKgInputView.readShort(); - ColumnFamilyHandle handle = - currentStateHandleKVStateColumnFamilies.get(kvStateId); - // insert all k/v pairs into DB - boolean keyGroupHasMoreKeys = true; - while (keyGroupHasMoreKeys) { - byte[] key = - BytePrimitiveArraySerializer.INSTANCE.deserialize( - compressedKgInputView); - byte[] value = - BytePrimitiveArraySerializer.INSTANCE.deserialize( - compressedKgInputView); - if (hasMetaDataFollowsFlag(key)) { - // clear the signal bit in the key to make it ready for insertion - // again - clearMetaDataFollowsFlag(key); - writeBatchWrapper.put(handle, key, value); - // TODO this could be aware of keyGroupPrefixBytes and write only - // one byte if possible - kvStateId = - END_OF_KEY_GROUP_MARK & compressedKgInputView.readShort(); - if (END_OF_KEY_GROUP_MARK == kvStateId) { - keyGroupHasMoreKeys = false; - } else { - handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - } - } else { - writeBatchWrapper.put(handle, key, value); - } - } - } + while (keyGroups.hasNext()) { + KeyGroup keyGroup = keyGroups.next(); + ThrowingIterator<KeyGroupEntry> groupEntries = keyGroup.getKeyGroupEntries(); + while (groupEntries.hasNext()) { + // TODO we call it more times than before Review comment: What's up with this one? ########## File path: flink-tests/src/test/java/org/apache/flink/test/state/SavepointStateBackendSwitchTest.java ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.contrib.streaming.state.RocksDBTestUtils; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.heap.HeapStateBackendTestBase; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer; +import org.apache.flink.streaming.api.operators.TimerSerializer; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.RunnableFuture; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Tests for the unified savepoint format. They verify you can switch a state backend through a + * savepoint. + */ +public class SavepointStateBackendSwitchTest { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void switchFromRocksToHeap() throws Exception { + + final File pathToWrite = tempFolder.newFile("tmp_rocks_map_state"); + + final MapStateDescriptor<Long, Long> mapStateDescriptor = + new MapStateDescriptor<>("my-map-state", Long.class, Long.class); + mapStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); + + final ValueStateDescriptor<Long> valueStateDescriptor = + new ValueStateDescriptor<>("my-value-state", Long.class); + valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); + + ListStateDescriptor<Long> listStateDescriptor = + new ListStateDescriptor<>("my-list-state", Long.class); + listStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); + + final Integer namespace1 = 1; + final Integer namespace2 = 2; + final Integer namespace3 = 3; + + takeRocksSavepoint( + pathToWrite, + mapStateDescriptor, + valueStateDescriptor, + listStateDescriptor, + namespace1, + namespace2, + namespace3); + + final SnapshotResult<KeyedStateHandle> stateHandles; + try (BufferedInputStream bis = + new BufferedInputStream((new FileInputStream(pathToWrite)))) { + stateHandles = + InstantiationUtil.deserializeObject( + bis, Thread.currentThread().getContextClassLoader()); + } + final KeyedStateHandle stateHandle = stateHandles.getJobManagerOwnedSnapshot(); + try (final HeapKeyedStateBackend<String> keyedBackend = + createHeapKeyedStateBackend(StateObjectCollection.singleton(stateHandle))) { + + InternalMapState<String, Integer, Long, Long> state = + keyedBackend.createInternalState(IntSerializer.INSTANCE, mapStateDescriptor); + + InternalValueState<String, Integer, Long> valueState = + keyedBackend.createInternalState(IntSerializer.INSTANCE, valueStateDescriptor); + + InternalListState<String, Integer, Long> listState = + keyedBackend.createInternalState(IntSerializer.INSTANCE, listStateDescriptor); + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + assertEquals(33L, (long) state.get(33L)); + assertEquals(55L, (long) state.get(55L)); + assertEquals(2, getStateSize(state)); + + state.setCurrentNamespace(namespace2); + assertEquals(22L, (long) state.get(22L)); + assertEquals(11L, (long) state.get(11L)); + assertEquals(2, getStateSize(state)); + listState.setCurrentNamespace(namespace2); + assertThat(listState.get(), contains(4L, 5L, 6L)); + + state.setCurrentNamespace(namespace3); + assertEquals(44L, (long) state.get(44L)); + assertEquals(1, getStateSize(state)); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace3); + assertEquals(11L, (long) state.get(11L)); + assertEquals(22L, (long) state.get(22L)); + assertEquals(33L, (long) state.get(33L)); + assertEquals(44L, (long) state.get(44L)); + assertEquals(55L, (long) state.get(55L)); + assertEquals(5, getStateSize(state)); + valueState.setCurrentNamespace(namespace3); + assertEquals(1239L, (long) valueState.value()); + listState.setCurrentNamespace(namespace3); + assertThat(listState.get(), contains(1L, 2L, 3L)); + + KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<String, Integer>> priorityQueue = + keyedBackend.create( + "event-time", + new TimerSerializer<>( + keyedBackend.getKeySerializer(), IntSerializer.INSTANCE)); + + assertThat(priorityQueue.size(), equalTo(1)); + assertThat( + priorityQueue.poll(), + equalTo(new TimerHeapInternalTimer<>(1234L, "mno", namespace3))); + } + } + + private HeapKeyedStateBackend<String> createHeapKeyedStateBackend( + Collection<KeyedStateHandle> stateHandles) throws BackendBuildingException { + // must be synchronized with the KeyGroup of the RocksDB state backend Review comment: Should this be a constant in the test that is passed also when creating the Rocks backend? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org