[ https://issues.apache.org/jira/browse/KAFKA-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364752#comment-16364752 ]
ASF GitHub Bot commented on KAFKA-6364: --------------------------------------- guozhangwang closed pull request #4511: KAFKA-6364: second check for ensuring changelog topic not changed during restore URL: https://github.com/apache/kafka/pull/4511 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c4929959211..ceb7024b97b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -51,7 +51,7 @@ private final Map<String, List<PartitionInfo>> partitions; private final SubscriptionState subscriptions; private final Map<TopicPartition, Long> beginningOffsets; - private final Map<TopicPartition, Long> endOffsets; + private final Map<TopicPartition, List<Long>> endOffsets; private final Map<TopicPartition, OffsetAndMetadata> committed; private final Queue<Runnable> pollTasks; private final Set<TopicPartition> paused; @@ -290,8 +290,26 @@ public synchronized void seekToEnd(Collection<TopicPartition> partitions) { subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST); } - public synchronized void updateEndOffsets(Map<TopicPartition, Long> newOffsets) { - endOffsets.putAll(newOffsets); + // needed for cases where you make a second call to endOffsets + public synchronized void addEndOffsets(final Map<TopicPartition, Long> newOffsets) { + innerUpdateEndOffsets(newOffsets, false); + } + + public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) { + innerUpdateEndOffsets(newOffsets, true); + } + + private void innerUpdateEndOffsets(final Map<TopicPartition, Long> newOffsets, + final boolean replace) { + + for (final Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) { + List<Long> offsets = endOffsets.get(entry.getKey()); + if (replace || offsets == null) { + offsets = new ArrayList<>(); + } + offsets.add(entry.getValue()); + endOffsets.put(entry.getKey(), offsets); + } } @Override @@ -354,7 +372,7 @@ public synchronized void resume(Collection<TopicPartition> partitions) { public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { Map<TopicPartition, Long> result = new HashMap<>(); for (TopicPartition tp : partitions) { - Long endOffset = endOffsets.get(tp); + Long endOffset = getEndOffset(endOffsets.get(tp)); if (endOffset == null) throw new IllegalStateException("The partition " + tp + " does not have an end offset."); result.put(tp, endOffset); @@ -430,7 +448,7 @@ private void resetOffsetPosition(TopicPartition tp) { if (offset == null) throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning"); } else if (strategy == OffsetResetStrategy.LATEST) { - offset = endOffsets.get(tp); + offset = getEndOffset(endOffsets.get(tp)); if (offset == null) throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end"); } else { @@ -438,4 +456,11 @@ private void resetOffsetPosition(TopicPartition tp) { } seek(tp, offset); } + + private Long getEndOffset(List<Long> offsets) { + if (offsets == null || offsets.isEmpty()) { + return null; + } + return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index b11c45ba313..5fcba76570e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -255,6 +255,15 @@ private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords, throw new TaskMigratedException(task, topicPartition, endOffset, pos); } + // need to check for changelog topic + if (restorer.offsetLimit() == Long.MAX_VALUE) { + final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition); + if (!restorer.hasCompleted(pos, updatedEndOffset)) { + throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos); + } + } + + log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}", topicPartition, restorer.restoredNumRecords(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index e69cede23fd..c65d4efadb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -390,9 +390,50 @@ public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore try { changelogReader.restore(active); fail("Should have thrown TaskMigratedException"); - } catch (final TaskMigratedException expected) { /* ignore */ } + } catch (final TaskMigratedException expected) { + /* ignore */ + } } + + @Test + public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() { + final int messages = 10; + setupConsumer(messages, topicPartition); + // in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic + // so a subsequent call to endOffsets returns a value exceeding the expected end value + consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L)); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); + + expect(active.restoringTaskFor(topicPartition)).andReturn(task); + replay(active); + + try { + changelogReader.restore(active); + fail("Should have thrown TaskMigratedException"); + } catch (final TaskMigratedException expected) { + // verifies second block threw exception with updated end offset + assertTrue(expected.getMessage().contains("end offset 15, current offset 10")); + } + } + + + @Test + public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() { + final int messages = 10; + setupConsumer(messages, topicPartition); + // in this case first call to endOffsets returns correct value, but a second thread has updated the source topic + // but since it's a source topic, the second check should not fire hence no exception + consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L)); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 9L, true, "storeName")); + + expect(active.restoringTaskFor(topicPartition)).andReturn(task); + replay(active); + + changelogReader.restore(active); + } + + @Test public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() { final int totalMessages = 10; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Second Check for End Offset During Restore > ---------------------------------------------- > > Key: KAFKA-6364 > URL: https://issues.apache.org/jira/browse/KAFKA-6364 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.0.0 > Reporter: Bill Bejeck > Assignee: Bill Bejeck > Priority: Major > Fix For: 1.0.2 > > > We need to re-check the ending offset when restoring a changelog topic to > guard against the race condition of an additional record appended to log > immediately on restoring start. Also, need to add a check for KTable source > topic and if offset limit is set. -- This message was sent by Atlassian JIRA (v7.6.3#76005)