cadonna commented on code in PR #12279:
URL: https://github.com/apache/kafka/pull/12279#discussion_r899841693
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -69,15 +77,27 @@ class DefaultStateUpdaterTest {
private final static TaskId TASK_1_0 = new TaskId(1, 0);
private final static TaskId TASK_1_1 = new TaskId(1, 1);
+ private final StreamsConfig config = new
StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader =
mock(ChangelogReader.class);
private final java.util.function.Consumer<Set<TopicPartition>>
offsetResetter = topicPartitions -> { };
- private final DefaultStateUpdater stateUpdater = new
DefaultStateUpdater(changelogReader, offsetResetter, Time.SYSTEM);
+ private final DefaultStateUpdater stateUpdater = new
DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
@AfterEach
public void tearDown() {
stateUpdater.shutdown(Duration.ofMinutes(1));
}
+ private Properties configProps(final int commitInterval) {
+ return mkObjectProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG,
safeUniqueClassTestName(getClass())),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:2171"),
+ mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2),
+ mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
commitInterval),
+ // we need to make sure that transaction timeout is not lower
than commit interval for EOS
Review Comment:
Do we have a check for this somewhere in our production code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]