mjsax commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r467291346
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -93,9 +90,7 @@ public boolean isActive() {
public void initializeIfNeeded() {
if (state() == State.CREATED) {
StateManagerUtil.registerStateStores(log, logPrefix, topology,
stateMgr, stateDirectory, processorContext);
-
- // initialize the snapshot with the current offsets as we don't
need to commit then until they change
- offsetSnapshotSinceLastCommit = new
HashMap<>(stateMgr.changelogOffsets());
+ initializeCheckpoint();
Review comment:
In the old code, we actually get a copy of the `Map`, while within
`initializeCheckpoint();` don't -- is this on purpose? It it safe?
Also, do we actually need the method? The old code was just doing the exact
some thing? It's just one-liner method -- what do we gain?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -49,6 +59,30 @@
this.stateDirectory = stateDirectory;
}
+ protected void initializeCheckpoint() {
+ // we will delete the local checkpoint file after registering the
state stores and loading them into the
+ // state manager, therefore we should initialize the snapshot as empty
to indicate over-write checkpoint needed
Review comment:
Seems the comment is outdated? `we should initialize the snapshot as
empty`
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1638,21 +1689,22 @@ public void
shouldCheckpointOffsetsOnPostCommitIfCommitNeeded() {
task.suspend();
task.prepareCommit();
- task.postCommit();
+ task.postCommit(false);
assertEquals(Task.State.SUSPENDED, task.state());
EasyMock.verify(stateManager);
}
@Test
- public void shouldSwallowExceptionOnCloseCleanError() {
+ public void shouldThrowExceptionOnCloseCleanError() {
final long offset = 543L;
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(partition1,
offset)));
- EasyMock.expectLastCall();
+ stateManager.checkpoint();
+ EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint
should not be called")).anyTimes();
Review comment:
as above? (more below... won't add comments each time)
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##########
@@ -412,7 +412,7 @@ public void shouldInitializeOffsetsFromCheckpointFile()
throws IOException {
stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback);
stateMgr.initializeStoreOffsetsFromCheckpoint(true);
- assertFalse(checkpointFile.exists());
+ assertTrue(checkpointFile.exists());
Review comment:
Should we add a test for EOS, that the checkpoint file is deleted?
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1259,13 +1262,47 @@ public void shouldReInitializeTopologyWhenResuming()
throws IOException {
}
@Test
- public void shouldCheckpointOffsetsOnCommit() {
+ public void
shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
final Long offset = 543L;
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
offset)).anyTimes();
-
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition,
offset)));
+ stateManager.checkpoint();
+ EasyMock.expectLastCall().once();
+
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition));
+ EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(Collections.singletonMap(changelogPartition, 0L))
+ .andReturn(Collections.singletonMap(changelogPartition, 10L))
+ .andReturn(Collections.singletonMap(changelogPartition, 20L));
+ stateManager.registerStore(stateStore,
stateStore.stateRestoreCallback);
EasyMock.expectLastCall();
+ EasyMock.replay(stateManager, recordCollector);
+
+ task = createStatefulTask(createConfig(false, "100"), true);
+
+ task.initializeIfNeeded();
+ task.completeRestoration();
+
+ task.prepareCommit();
+ task.postCommit(true);
+
+ task.prepareCommit();
+ task.postCommit(false);
+
+ EasyMock.verify(recordCollector);
Review comment:
Should we verify `stateManager`, too?
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1560,42 +1641,13 @@ public void shouldCheckpointWithCreatedStateOnClose() {
}
@Test
- public void shouldNotCommitAndThrowOnCloseDirty() {
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- stateManager.close();
- EasyMock.expectLastCall().andThrow(new
ProcessorStateException("KABOOM!")).anyTimes();
- stateManager.checkpoint(EasyMock.anyObject());
- EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint
should not be called")).anyTimes();
-
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
- EasyMock.replay(stateManager, recordCollector);
-
- final MetricName metricName = setupCloseTaskMetric();
-
- task = createOptimizedStatefulTask(createConfig(false, "100"),
consumer);
-
- task.initializeIfNeeded();
- task.completeRestoration();
-
- task.suspend();
- task.closeDirty();
-
- assertEquals(Task.State.CLOSED, task.state());
- assertTrue(source1.initialized);
- assertTrue(source1.closed);
-
- final double expectedCloseTaskMetric = 1.0;
- verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics,
metricName);
-
- EasyMock.verify(stateManager);
- }
-
- @Test
- public void shouldCheckpointOnCloseRestoring() {
+ public void shouldNotCheckpointOnCloseRestoringIfNoProgress() {
stateManager.flush();
- EasyMock.expectLastCall();
- stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
- EasyMock.expectLastCall();
+ EasyMock.expectLastCall().andThrow(new AssertionError("Flush should
not be called")).anyTimes();
+ stateManager.checkpoint();
+ EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint
should not be called")).anyTimes();
Review comment:
Similar to above: instead of throwing, it should be sufficient to just
not register any expected calll?
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1458,50 +1498,91 @@ public void shouldThrowIfPostCommittingOnIllegalState()
{
task.transitionTo(Task.State.SUSPENDED);
task.transitionTo(Task.State.CLOSED);
- assertThrows(IllegalStateException.class, task::postCommit);
+ assertThrows(IllegalStateException.class, () -> task.postCommit(true));
}
@Test
public void shouldSkipCheckpointingSuspendedCreatedTask() {
- stateManager.checkpoint(EasyMock.anyObject());
+ stateManager.checkpoint();
EasyMock.expectLastCall().andThrow(new AssertionError("Should not have
tried to checkpoint"));
EasyMock.replay(stateManager);
task = createStatefulTask(createConfig(false, "100"), true);
task.suspend();
- task.postCommit();
+ task.postCommit(true);
}
@Test
- public void shouldCheckpointWithEmptyOffsetsForSuspendedRestoringTask() {
- stateManager.checkpoint(emptyMap());
+ public void shouldCheckpointForSuspendedTask() {
+ stateManager.checkpoint();
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(Collections.singletonMap(partition1, 0L))
+ .andReturn(Collections.singletonMap(partition1, 1L));
EasyMock.replay(stateManager);
task = createStatefulTask(createConfig(false, "100"), true);
task.initializeIfNeeded();
task.suspend();
- task.postCommit();
+ task.postCommit(true);
EasyMock.verify(stateManager);
}
@Test
- public void
shouldCheckpointWithEmptyOffsetsForSuspendedRunningTaskWithNoCommitNeeded() {
- stateManager.checkpoint(emptyMap());
+ public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
+ EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(Collections.singletonMap(partition1, 1L))
+ .andReturn(Collections.singletonMap(partition1, 2L))
+ .andReturn(Collections.singletonMap(partition1, 3L));
+ stateManager.checkpoint();
+ EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint
should not be called")).anyTimes();
Review comment:
Why do we need to setup an exception? If we don't setup any expected
call at all, it should fail automatically?
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
##########
@@ -207,15 +207,25 @@ public void
shouldFlushAndCheckpointStateManagerOnCommit() {
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
stateManager.flush();
EasyMock.expectLastCall();
- stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
-
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition,
50L));
+ stateManager.checkpoint();
+ EasyMock.expectLastCall();
+ EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(Collections.singletonMap(partition, 50L))
+ .andReturn(Collections.singletonMap(partition, 11000L))
+ .andReturn(Collections.singletonMap(partition, 11000L));
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
EasyMock.replay(stateManager);
task = createStandbyTask();
task.initializeIfNeeded();
task.prepareCommit();
- task.postCommit();
+ task.postCommit(false); // this should not checkpoint
Review comment:
It's unclear to me, how we actually verify that the checkpointing
happened? Above, we have
```
stateManager.checkpoint();
EasyMock.expectLastCall();
```
but it only help to verify that we checkpoint a single time, but not which
of the three calls does the checkpointing?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]