[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513125#comment-16513125
 ] 

ASF GitHub Bot commented on KAFKA-6711:
---------------------------------------

mjsax closed pull request #4789: KAFKA-6711: GlobalStateManagerImpl should not 
write offsets
URL: https://github.com/apache/kafka/pull/4789
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 6052f96053c..be160bd5a33 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -167,7 +167,7 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
         for (final TopicPartition topicPartition : topicPartitions) {
             consumer.assign(Collections.singletonList(topicPartition));
             final Long checkpoint = checkpointableOffsets.get(topicPartition);
-            if (checkpoint != null) {
+            if (checkpoint != null && checkpoint > 
StateRestorer.NO_CHECKPOINT) {
                 consumer.seek(topicPartition, checkpoint);
             } else {
                 
consumer.seekToBeginning(Collections.singletonList(topicPartition));
@@ -249,10 +249,33 @@ public void close(final Map<TopicPartition, Long> 
offsets) throws IOException {
 
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
+
+        // Find non persistent store's topics
+        final Map<String, String> storeToChangelogTopic = 
topology.storeToChangelogTopic();
+        final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
+        for (final StateStore store : topology.globalStateStores()) {
+            if (!store.persistent() && 
storeToChangelogTopic.containsKey(store.name())) {
+                
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+            }
+        }
+
         checkpointableOffsets.putAll(offsets);
-        if (!checkpointableOffsets.isEmpty()) {
+
+        final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
+
+        // Skip non persistent store
+        for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
+            final String topic = topicPartitionOffset.getKey().topic();
+            if (globalNonPersistentStoresTopics.contains(topic)) {
+                filteredOffsets.put(topicPartitionOffset.getKey(), (long) 
StateRestorer.NO_CHECKPOINT);
+            } else {
+                filteredOffsets.put(topicPartitionOffset.getKey(), 
topicPartitionOffset.getValue());
+            }
+        }
+
+        if (!filteredOffsets.isEmpty()) {
             try {
-                checkpoint.write(checkpointableOffsets);
+                checkpoint.write(filteredOffsets);
             } catch (IOException e) {
                 log.warn("Failed to write offsets checkpoint for global 
stores", e);
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index e9d61f5ad64..7e838763031 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -68,12 +68,13 @@
     private final MockStateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
     private final TopicPartition t1 = new TopicPartition("t1", 1);
     private final TopicPartition t2 = new TopicPartition("t2", 1);
+    private final TopicPartition t3 = new TopicPartition("t3", 1);
     private GlobalStateManagerImpl stateManager;
     private NoOpProcessorContext context;
     private StateDirectory stateDirectory;
     private String stateDirPath;
     private NoOpReadOnlyStore<Object, Object> store1;
-    private NoOpReadOnlyStore store2;
+    private NoOpReadOnlyStore store2, store3;
     private MockConsumer<byte[], byte[]> consumer;
     private File checkpointFile;
     private ProcessorTopology topology;
@@ -83,18 +84,21 @@ public void before() throws IOException {
         final Map<String, String> storeToTopic = new HashMap<>();
         storeToTopic.put("t1-store", "t1");
         storeToTopic.put("t2-store", "t2");
+        storeToTopic.put("t3-store", "t3");
 
         final Map<StateStore, ProcessorNode> storeToProcessorNode = new 
HashMap<>();
-        store1 = new NoOpReadOnlyStore<>("t1-store");
+        store1 = new NoOpReadOnlyStore<>("t1-store", true);
         storeToProcessorNode.put(store1, new MockProcessorNode(-1));
-        store2 = new NoOpReadOnlyStore("t2-store");
+        store2 = new NoOpReadOnlyStore("t2-store", true);
         storeToProcessorNode.put(store2, new MockProcessorNode(-1));
+        store3 = new NoOpReadOnlyStore("t3-store", false);
+        storeToProcessorNode.put(store3, new MockProcessorNode(-1));
         topology = new 
ProcessorTopology(Collections.<ProcessorNode>emptyList(),
                                          Collections.<String, 
SourceNode>emptyMap(),
                                          Collections.<String, 
SinkNode>emptyMap(),
                                          Collections.<StateStore>emptyList(),
                                          storeToTopic,
-                                         Arrays.<StateStore>asList(store1, 
store2));
+                                         Arrays.<StateStore>asList(store1, 
store2, store3));
 
         context = new NoOpProcessorContext();
         stateDirPath = TestUtils.tempDirectory().getPath();
@@ -158,7 +162,7 @@ public void shouldInitializeStateStores() {
     @Test
     public void shouldReturnInitializedStoreNames() {
         final Set<String> storeNames = stateManager.initialize(context);
-        assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames);
+        assertEquals(Utils.mkSet(store1.name(), store2.name(), store3.name()), 
storeNames);
     }
 
     @Test
@@ -462,6 +466,16 @@ public void shouldCheckpointRestoredOffsetsToFile() throws 
IOException {
         assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
     }
 
+    @Test
+    public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws 
IOException {
+        stateManager.initialize(context);
+        initializeConsumer(10, 1, t3);
+        stateManager.register(store3, stateRestoreCallback);
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+        assertThat(readOffsetsCheckpoint(), 
equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT)));
+    }
+
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws 
IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new 
File(stateManager.baseDir(),
                                                                                
 ProcessorStateManager.CHECKPOINT_FILE_NAME));
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java 
b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index 0ada2e4432d..0fcbfe5d760 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -28,14 +28,21 @@
     private boolean open = true;
     public boolean initialized;
     public boolean flushed;
+    public boolean rocksdbStore;
 
 
     public NoOpReadOnlyStore() {
-        this("");
+        this("", false);
     }
 
+
     public NoOpReadOnlyStore(final String name) {
+        this(name, false);
+    }
+
+    public NoOpReadOnlyStore(final String name, final boolean rocksdbStore) {
         this.name = name;
+        this.rocksdbStore = rocksdbStore;
     }
 
     @Override
@@ -80,7 +87,7 @@ public void close() {
 
     @Override
     public boolean persistent() {
-        return false;
+        return rocksdbStore;
     }
 
     @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6711
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6711
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.1
>            Reporter: Cemalettin Koç
>            Assignee: Cemalettin Koç
>            Priority: Major
>              Labels: newbie
>             Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to