[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513125#comment-16513125 ]
ASF GitHub Bot commented on KAFKA-6711: --------------------------------------- mjsax closed pull request #4789: KAFKA-6711: GlobalStateManagerImpl should not write offsets URL: https://github.com/apache/kafka/pull/4789 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 6052f96053c..be160bd5a33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -167,7 +167,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, for (final TopicPartition topicPartition : topicPartitions) { consumer.assign(Collections.singletonList(topicPartition)); final Long checkpoint = checkpointableOffsets.get(topicPartition); - if (checkpoint != null) { + if (checkpoint != null && checkpoint > StateRestorer.NO_CHECKPOINT) { consumer.seek(topicPartition, checkpoint); } else { consumer.seekToBeginning(Collections.singletonList(topicPartition)); @@ -249,10 +249,33 @@ public void close(final Map<TopicPartition, Long> offsets) throws IOException { @Override public void checkpoint(final Map<TopicPartition, Long> offsets) { + + // Find non persistent store's topics + final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); + final Set<String> globalNonPersistentStoresTopics = new HashSet<>(); + for (final StateStore store : topology.globalStateStores()) { + if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); + } + } + checkpointableOffsets.putAll(offsets); - if (!checkpointableOffsets.isEmpty()) { + + final Map<TopicPartition, Long> filteredOffsets = new HashMap<>(); + + // Skip non persistent store + for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) { + final String topic = topicPartitionOffset.getKey().topic(); + if (globalNonPersistentStoresTopics.contains(topic)) { + filteredOffsets.put(topicPartitionOffset.getKey(), (long) StateRestorer.NO_CHECKPOINT); + } else { + filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); + } + } + + if (!filteredOffsets.isEmpty()) { try { - checkpoint.write(checkpointableOffsets); + checkpoint.write(filteredOffsets); } catch (IOException e) { log.warn("Failed to write offsets checkpoint for global stores", e); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index e9d61f5ad64..7e838763031 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -68,12 +68,13 @@ private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener(); private final TopicPartition t1 = new TopicPartition("t1", 1); private final TopicPartition t2 = new TopicPartition("t2", 1); + private final TopicPartition t3 = new TopicPartition("t3", 1); private GlobalStateManagerImpl stateManager; private NoOpProcessorContext context; private StateDirectory stateDirectory; private String stateDirPath; private NoOpReadOnlyStore<Object, Object> store1; - private NoOpReadOnlyStore store2; + private NoOpReadOnlyStore store2, store3; private MockConsumer<byte[], byte[]> consumer; private File checkpointFile; private ProcessorTopology topology; @@ -83,18 +84,21 @@ public void before() throws IOException { final Map<String, String> storeToTopic = new HashMap<>(); storeToTopic.put("t1-store", "t1"); storeToTopic.put("t2-store", "t2"); + storeToTopic.put("t3-store", "t3"); final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>(); - store1 = new NoOpReadOnlyStore<>("t1-store"); + store1 = new NoOpReadOnlyStore<>("t1-store", true); storeToProcessorNode.put(store1, new MockProcessorNode(-1)); - store2 = new NoOpReadOnlyStore("t2-store"); + store2 = new NoOpReadOnlyStore("t2-store", true); storeToProcessorNode.put(store2, new MockProcessorNode(-1)); + store3 = new NoOpReadOnlyStore("t3-store", false); + storeToProcessorNode.put(store3, new MockProcessorNode(-1)); topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(), Collections.<String, SourceNode>emptyMap(), Collections.<String, SinkNode>emptyMap(), Collections.<StateStore>emptyList(), storeToTopic, - Arrays.<StateStore>asList(store1, store2)); + Arrays.<StateStore>asList(store1, store2, store3)); context = new NoOpProcessorContext(); stateDirPath = TestUtils.tempDirectory().getPath(); @@ -158,7 +162,7 @@ public void shouldInitializeStateStores() { @Test public void shouldReturnInitializedStoreNames() { final Set<String> storeNames = stateManager.initialize(context); - assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames); + assertEquals(Utils.mkSet(store1.name(), store2.name(), store3.name()), storeNames); } @Test @@ -462,6 +466,16 @@ public void shouldCheckpointRestoredOffsetsToFile() throws IOException { assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); } + @Test + public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { + stateManager.initialize(context); + initializeConsumer(10, 1, t3); + stateManager.register(store3, stateRestoreCallback); + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + + assertThat(readOffsetsCheckpoint(), equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT))); + } + private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME)); diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index 0ada2e4432d..0fcbfe5d760 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -28,14 +28,21 @@ private boolean open = true; public boolean initialized; public boolean flushed; + public boolean rocksdbStore; public NoOpReadOnlyStore() { - this(""); + this("", false); } + public NoOpReadOnlyStore(final String name) { + this(name, false); + } + + public NoOpReadOnlyStore(final String name, final boolean rocksdbStore) { this.name = name; + this.rocksdbStore = rocksdbStore; } @Override @@ -80,7 +87,7 @@ public void close() { @Override public boolean persistent() { - return false; + return rocksdbStore; } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -------------------------------------------------------------------------------------- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.1 > Reporter: Cemalettin Koç > Assignee: Cemalettin Koç > Priority: Major > Labels: newbie > Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)