lihaosky commented on code in PR #11896:
URL: https://github.com/apache/kafka/pull/11896#discussion_r847582057
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java:
##########
@@ -232,11 +247,19 @@
);
break;
case ROCKS_DB:
- supplier = Stores.persistentTimestampedWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false
+ supplier = emitStrategy.type() ==
StrategyType.ON_WINDOW_CLOSE ?
+
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+ materialized.storeName(),
Review Comment:
If a user just enables `emit final` for their existing topology which uses
`emit eager`, will it cause it to use existing state store which has wrong data
format etc?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
+import
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
+import
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;
+
+/**
+ * This interface controls the strategy that can be used to control how we
emit results in a processor.
+ */
+public interface EmitStrategy {
+
+ enum StrategyType {
+ ON_WINDOW_CLOSE,
+ ON_WINDOW_UPDATE
+ }
+
+ /**
+ * Returns the strategy type
+ * @return Emit strategy type
+ */
+ StrategyType type();
+
+ /**
+ * This strategy indicates that the aggregated result for a window will
only be outputted when the
+ * window closes instead of when there's an update to the window.
Review Comment:
Sure. I feel this is not related to caching.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
+import
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
+import
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;
+
+/**
+ * This interface controls the strategy that can be used to control how we
emit results in a processor.
+ */
+public interface EmitStrategy {
+
+ enum StrategyType {
+ ON_WINDOW_CLOSE,
+ ON_WINDOW_UPDATE
+ }
+
+ /**
+ * Returns the strategy type
+ * @return Emit strategy type
+ */
+ StrategyType type();
+
+ /**
+ * This strategy indicates that the aggregated result for a window will
only be outputted when the
+ * window closes instead of when there's an update to the window.
+ *
+ * <p>This strategy should only be used for window which can close. For
example, it doesn't make sense
+ * to be used with {@link UnlimitedWindow}
Review Comment:
Will update with details.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -80,22 +109,54 @@ public void enableSendingOldValues() {
private TimestampedWindowStore<KIn, VAgg> windowStore;
private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
private Sensor droppedRecordsSensor;
+ private Sensor emittedRecordsSensor;
+ private Sensor emitFinalLatencySensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+ private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP;
+ private InternalProcessorContext<Windowed<KIn>, Change<VAgg>>
internalProcessorContext;
+ private final TimeTracker timeTracker = new TimeTracker();
+ private final Time time = Time.SYSTEM;
@Override
public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>>
context) {
super.init(context);
- final InternalProcessorContext<Windowed<KIn>, Change<VAgg>>
internalProcessorContext =
- (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>)
context;
+ internalProcessorContext =
(InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
final StreamsMetricsImpl metrics =
internalProcessorContext.metrics();
final String threadId = Thread.currentThread().getName();
droppedRecordsSensor = droppedRecordsSensor(threadId,
context.taskId().toString(), metrics);
+ emittedRecordsSensor = emittedRecordsSensor(threadId,
context.taskId().toString(), metrics);
+ emitFinalLatencySensor = emitFinalLatencySensor(threadId,
context.taskId().toString(), metrics);
windowStore = context.getStateStore(storeName);
- tupleForwarder = new TimestampedTupleForwarder<>(
- windowStore,
- context,
- new TimestampedCacheFlushListener<>(context),
- sendOldValues);
+
+ if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+ // Don't set flush lister which emit cache results
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ sendOldValues);
+ } else {
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
+ }
+
+ log.info("EmitStrategy=" + emitStrategy.type());
+ // Restore last emit close time for ON_WINDOW_CLOSE strategy
+ if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+ final Long lastEmitTime =
internalProcessorContext.processorMetadataForKey(storeName);
+ if (lastEmitTime != null) {
+ lastEmitCloseTime = lastEmitTime;
+ }
+ final long emitInterval = StreamsConfig.InternalConfig.getLong(
+ context.appConfigs(),
+ EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
+ 1000L
+ );
+ timeTracker.setEmitInterval(emitInterval);
+ log.info("EmitInterval=" + emitInterval);
Review Comment:
Similar to previous comment, I feel this won't be called many time (just
once when creating the processor?). And the config won't be printed out when we
print all configs since this is an internal config. This will be convenient for
debugging.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java:
##########
@@ -202,12 +207,22 @@
return aggregateBuilder.build(
new NamedInternal(reduceName),
materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows,
materializedInternal.storeName(), aggregateBuilder.reduceInitializer,
aggregatorForReducer(reducer)),
+ new KStreamWindowAggregate<>(windows,
materializedInternal.storeName(), emitStrategy,
aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
materializedInternal.valueSerde());
}
+ //@Override
Review Comment:
This method is not added to the interface `TimeWindowedKStream` yet since
`SlidingWindowedKStreamImpl` also implements the interface. I plan to add it to
interface together with `SlidingWindowedKStreamImpl` implementation.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -80,22 +109,54 @@ public void enableSendingOldValues() {
private TimestampedWindowStore<KIn, VAgg> windowStore;
private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
private Sensor droppedRecordsSensor;
+ private Sensor emittedRecordsSensor;
+ private Sensor emitFinalLatencySensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+ private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP;
+ private InternalProcessorContext<Windowed<KIn>, Change<VAgg>>
internalProcessorContext;
+ private final TimeTracker timeTracker = new TimeTracker();
+ private final Time time = Time.SYSTEM;
@Override
public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>>
context) {
super.init(context);
- final InternalProcessorContext<Windowed<KIn>, Change<VAgg>>
internalProcessorContext =
- (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>)
context;
+ internalProcessorContext =
(InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
final StreamsMetricsImpl metrics =
internalProcessorContext.metrics();
final String threadId = Thread.currentThread().getName();
droppedRecordsSensor = droppedRecordsSensor(threadId,
context.taskId().toString(), metrics);
+ emittedRecordsSensor = emittedRecordsSensor(threadId,
context.taskId().toString(), metrics);
+ emitFinalLatencySensor = emitFinalLatencySensor(threadId,
context.taskId().toString(), metrics);
windowStore = context.getStateStore(storeName);
- tupleForwarder = new TimestampedTupleForwarder<>(
- windowStore,
- context,
- new TimestampedCacheFlushListener<>(context),
- sendOldValues);
+
+ if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+ // Don't set flush lister which emit cache results
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ sendOldValues);
+ } else {
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
+ }
+
+ log.info("EmitStrategy=" + emitStrategy.type());
Review Comment:
I guess this won't be called many times and flood the log while this is
helpful information. Will this be printed somewhere else (i.e. topology)?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -80,22 +109,54 @@ public void enableSendingOldValues() {
private TimestampedWindowStore<KIn, VAgg> windowStore;
private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
private Sensor droppedRecordsSensor;
+ private Sensor emittedRecordsSensor;
+ private Sensor emitFinalLatencySensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+ private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP;
+ private InternalProcessorContext<Windowed<KIn>, Change<VAgg>>
internalProcessorContext;
+ private final TimeTracker timeTracker = new TimeTracker();
+ private final Time time = Time.SYSTEM;
@Override
public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>>
context) {
super.init(context);
- final InternalProcessorContext<Windowed<KIn>, Change<VAgg>>
internalProcessorContext =
- (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>)
context;
+ internalProcessorContext =
(InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
final StreamsMetricsImpl metrics =
internalProcessorContext.metrics();
final String threadId = Thread.currentThread().getName();
droppedRecordsSensor = droppedRecordsSensor(threadId,
context.taskId().toString(), metrics);
+ emittedRecordsSensor = emittedRecordsSensor(threadId,
context.taskId().toString(), metrics);
Review Comment:
I'm ok removing this if the process involves another round of discussion and
voting 😃
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
droppedRecordsSensor.record();
}
}
+
+ maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime),
time, emitFinalLatencySensor);
+ }
+
+ private void tryEmitFinalResult(final Record<KIn, VIn> record, final
long closeTime) {
+ if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+ return;
+ }
+
+ final long now = internalProcessorContext.currentSystemTimeMs();
+ // Throttle emit frequency
+ if (now < timeTracker.nextTimeToEmit) {
+ return;
+ }
+
+ // Schedule next emit time based on now to avoid the case that if
system time jumps a lot,
+ // this can be triggered everytime
+ timeTracker.nextTimeToEmit = now;
+ timeTracker.advanceNextTimeToEmit();
+
+ // Close time does not progress
+ if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP &&
lastEmitCloseTime >= closeTime) {
+ return;
+ }
+
+ final long emitWindowStart = closeTime - windows.size();
+ if (emitWindowStart < 0) {
+ // If emitWindowStart is 0, it means first window closes since
windowEndTime
+ // is exclusive
+ return;
+ }
+
+ // Because we only get here when emitWindowStart > 0 which means
closeTime > windows.size()
+ // Since we set lastEmitCloseTime to closeTime before storing to
processor metadata
+ // lastEmitCloseTime - windows.size() is always > 0
Review Comment:
Trying to say `lastEmitCloseTime - windows.size()` won't be less than or
equal to 0 and the reason
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
droppedRecordsSensor.record();
}
}
+
+ maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime),
time, emitFinalLatencySensor);
+ }
+
+ private void tryEmitFinalResult(final Record<KIn, VIn> record, final
long closeTime) {
+ if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+ return;
+ }
+
+ final long now = internalProcessorContext.currentSystemTimeMs();
+ // Throttle emit frequency
+ if (now < timeTracker.nextTimeToEmit) {
+ return;
+ }
+
+ // Schedule next emit time based on now to avoid the case that if
system time jumps a lot,
+ // this can be triggered everytime
+ timeTracker.nextTimeToEmit = now;
+ timeTracker.advanceNextTimeToEmit();
+
+ // Close time does not progress
+ if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP &&
lastEmitCloseTime >= closeTime) {
Review Comment:
If we have multiple source topics in a task and `lastEmitCloseTime` is
committed successfully for some sources and commit offset is not committed
successfully for some sources, is it possible we restored `lastEmitCloseTime`
and start processing from earlier time? In this case, `lastEmitCloseTime` can
be larger than `closeTime`? Maybe log warning if `lastEmitCloseTime` is larger?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
droppedRecordsSensor.record();
}
}
+
+ maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime),
time, emitFinalLatencySensor);
+ }
+
+ private void tryEmitFinalResult(final Record<KIn, VIn> record, final
long closeTime) {
+ if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+ return;
+ }
+
+ final long now = internalProcessorContext.currentSystemTimeMs();
+ // Throttle emit frequency
+ if (now < timeTracker.nextTimeToEmit) {
+ return;
+ }
+
+ // Schedule next emit time based on now to avoid the case that if
system time jumps a lot,
+ // this can be triggered everytime
+ timeTracker.nextTimeToEmit = now;
+ timeTracker.advanceNextTimeToEmit();
+
+ // Close time does not progress
+ if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP &&
lastEmitCloseTime >= closeTime) {
+ return;
+ }
+
+ final long emitWindowStart = closeTime - windows.size();
+ if (emitWindowStart < 0) {
+ // If emitWindowStart is 0, it means first window closes since
windowEndTime
+ // is exclusive
+ return;
+ }
+
+ // Because we only get here when emitWindowStart > 0 which means
closeTime > windows.size()
+ // Since we set lastEmitCloseTime to closeTime before storing to
processor metadata
+ // lastEmitCloseTime - windows.size() is always > 0
+ // Set lastEmitWindowStart to -1L if not set so that when we
fetchAll, we fetch from 0L
+ final long lastEmitWindowStart = lastEmitCloseTime ==
ConsumerRecord.NO_TIMESTAMP ?
+ -1L : lastEmitCloseTime - windows.size();
+
+ if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) {
+ final Map<Long, W> matchedCloseWindows =
windows.windowsFor(emitWindowStart);
+ final Map<Long, W> matchedEmitWindows =
windows.windowsFor(lastEmitWindowStart);
+
+ // Don't fetch store if the new emit window close time doesn't
progress enough to cover next
+ // window
+ if (matchedCloseWindows.equals(matchedEmitWindows)) {
+ log.debug("no new windows to emit. LastEmitCloseTime={},
newCloseTime={}",
+ lastEmitCloseTime, closeTime);
+ return;
+ }
+ }
+
+ final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>>
windowToEmit = windowStore
+ .fetchAll(lastEmitWindowStart + 1, emitWindowStart);
+
+ int emittedCount = 0;
+ while (windowToEmit.hasNext()) {
+ emittedCount++;
+ final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv =
windowToEmit.next();
+ tupleForwarder.maybeForward(
+ record.withKey(kv.key)
+ .withValue(new Change<>(kv.value.value(), null))
+ .withTimestamp(kv.value.timestamp())
+ .withHeaders(null)); // Don't set header
+ }
+ emittedRecordsSensor.record(emittedCount);
+
+ lastEmitCloseTime = closeTime;
+ internalProcessorContext.addProcessorMetadataKeyValue(storeName,
closeTime);
Review Comment:
I thought about this and thought it might be more consistent if we shift
both `lastEmitCloseTime` and `closeTime`. If we just store `lastEmitStartTime`,
we only shift `closeTime` when processing. Also I feel it's easier to store
`closeTime` as it's easier to see if a window closes without shifting it and
see.
##########
streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java:
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThrows;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class TimeWindowedKStreamIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
+ mkProperties(
+ mkMap(mkEntry("log.retention.hours", "-1"),
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we
manipulate timestamp
+ )
+ );
+
+ @BeforeClass
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+
+ private StreamsBuilder builder;
+ private Properties streamsConfiguration;
+ private KafkaStreams kafkaStreams;
+ private String streamOneInput;
+ private String streamTwoInput;
+ private String outputTopic;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameter
+ public StrategyType type;
+
+ @Parameter(1)
+ public EmitStrategy emitStrategy;
+
+ private boolean emitFinal;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> getEmitStrategy() {
+ return asList(new Object[][] {
+ {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()},
+ {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()}
+ });
+ }
+
+ @Before
+ public void before() throws InterruptedException {
+ builder = new StreamsBuilder();
+ createTopics();
+ streamsConfiguration = new Properties();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
100L);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
0); // Always process
+
streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
Long.MAX_VALUE); // Don't expire changelog
+
+ emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE;
+ }
+
+ @After
+ public void whenShuttingDown() throws IOException {
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ kafkaStreams.cleanUp();
+ }
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ }
+
+ @Test
+ public void shouldAggregateWindowedWithNoGrace() throws Exception {
+ produceMessages(
+ streamOneInput,
+ new KeyValueTimestamp<>("A", "1", 0),
+ new KeyValueTimestamp<>("A", "1", 5),
+ new KeyValueTimestamp<>("A", "1", 10), // close [0, 10)
+ new KeyValueTimestamp<>("B", "2", 6), // late and skip
+ new KeyValueTimestamp<>("B", "2", 11), // close [0, 10)
Review Comment:
Yeah. It's closed. Will update comment
##########
streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java:
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThrows;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class TimeWindowedKStreamIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
Review Comment:
I saw `EmbeddedKafkaCluster` is always used for Integration testing and
`TopologyTestDriver` is used for unit test?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
droppedRecordsSensor.record();
}
}
+
+ maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime),
time, emitFinalLatencySensor);
+ }
+
+ private void tryEmitFinalResult(final Record<KIn, VIn> record, final
long closeTime) {
+ if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+ return;
+ }
+
+ final long now = internalProcessorContext.currentSystemTimeMs();
+ // Throttle emit frequency
+ if (now < timeTracker.nextTimeToEmit) {
+ return;
+ }
+
+ // Schedule next emit time based on now to avoid the case that if
system time jumps a lot,
+ // this can be triggered everytime
+ timeTracker.nextTimeToEmit = now;
+ timeTracker.advanceNextTimeToEmit();
+
+ // Close time does not progress
+ if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP &&
lastEmitCloseTime >= closeTime) {
+ return;
+ }
+
+ final long emitWindowStart = closeTime - windows.size();
Review Comment:
Yeah. Will rename. Naming is hard...
##########
streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java:
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThrows;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class TimeWindowedKStreamIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
+ mkProperties(
+ mkMap(mkEntry("log.retention.hours", "-1"),
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we
manipulate timestamp
+ )
+ );
+
+ @BeforeClass
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+
+ private StreamsBuilder builder;
+ private Properties streamsConfiguration;
+ private KafkaStreams kafkaStreams;
+ private String streamOneInput;
+ private String streamTwoInput;
+ private String outputTopic;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameter
+ public StrategyType type;
+
+ @Parameter(1)
+ public EmitStrategy emitStrategy;
+
+ private boolean emitFinal;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> getEmitStrategy() {
+ return asList(new Object[][] {
+ {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()},
+ {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()}
+ });
+ }
+
+ @Before
+ public void before() throws InterruptedException {
+ builder = new StreamsBuilder();
+ createTopics();
+ streamsConfiguration = new Properties();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
100L);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
0); // Always process
+
streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
Long.MAX_VALUE); // Don't expire changelog
+
+ emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE;
+ }
+
+ @After
+ public void whenShuttingDown() throws IOException {
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ kafkaStreams.cleanUp();
+ }
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ }
+
+ @Test
+ public void shouldAggregateWindowedWithNoGrace() throws Exception {
+ produceMessages(
+ streamOneInput,
+ new KeyValueTimestamp<>("A", "1", 0),
+ new KeyValueTimestamp<>("A", "1", 5),
+ new KeyValueTimestamp<>("A", "1", 10), // close [0, 10)
+ new KeyValueTimestamp<>("B", "2", 6), // late and skip
Review Comment:
Yeah. It does go to second window. Will update the comment
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java:
##########
@@ -158,21 +235,23 @@ public void shouldMaterializeCount() {
final List<KeyValue<Windowed<String>, Long>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(Arrays.asList(
+ assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
2L),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500,
1000)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500,
1000)), 2L))));
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500,
1000)), 2L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000,
1500)), 1L))));
Review Comment:
I added one more input in `processData`: `inputTopic.pipeInput("2", "30",
1000L);`
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
droppedRecordsSensor.record();
}
}
+
+ maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime),
time, emitFinalLatencySensor);
+ }
+
+ private void tryEmitFinalResult(final Record<KIn, VIn> record, final
long closeTime) {
+ if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+ return;
+ }
+
+ final long now = internalProcessorContext.currentSystemTimeMs();
+ // Throttle emit frequency
+ if (now < timeTracker.nextTimeToEmit) {
+ return;
+ }
+
+ // Schedule next emit time based on now to avoid the case that if
system time jumps a lot,
+ // this can be triggered everytime
+ timeTracker.nextTimeToEmit = now;
Review Comment:
stream-stream join didn't do this. Created
https://issues.apache.org/jira/browse/KAFKA-13817 to track
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java:
##########
@@ -42,84 +47,142 @@
import org.junit.Before;
import org.junit.Test;
-import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
import static java.time.Duration.ofMillis;
import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
+@RunWith(Parameterized.class)
public class TimeWindowedKStreamImplTest {
private static final String TOPIC = "input";
+ private static final Windowed<String> KEY_1_WINDOW_0 = new Windowed<>("1",
new TimeWindow(0L, 500L));
+ private static final Windowed<String> KEY_1_WINDOW_1 = new Windowed<>("1",
new TimeWindow(500L, 1000L));
+ private static final Windowed<String> KEY_2_WINDOW_1 = new Windowed<>("2",
new TimeWindow(500L, 1000L));
+ private static final Windowed<String> KEY_2_WINDOW_2 = new Windowed<>("2",
new TimeWindow(1000L, 1500L));
+
private final StreamsBuilder builder = new StreamsBuilder();
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
- private TimeWindowedKStream<String, String> windowedStream;
+ private TimeWindowedKStreamImpl<String, String, TimeWindow> windowedStream;
+
+ @Parameter
+ public StrategyType type;
+
+ @Parameter(1)
+ public EmitStrategy emitStrategy;
+
+ private boolean emitFinal;
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> getKeySchema() {
+ return asList(new Object[][] {
+ {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()},
+ {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()}
+ });
+ }
+
+ @SuppressWarnings("unchecked")
@Before
public void before() {
+ emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
+ // Set interval to 0 so that it always tries to emit
+
props.setProperty(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
"0");
final KStream<String, String> stream = builder.stream(TOPIC,
Consumed.with(Serdes.String(), Serdes.String()));
- windowedStream = stream.
+ // TODO: remove this cast
https://issues.apache.org/jira/browse/KAFKA-13800
+ windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>)
(stream.
groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)));
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L))));
}
@Test
public void shouldCountWindowed() {
final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void>
supplier = new MockApiProcessorSupplier<>();
windowedStream
+ .emitStrategy(emitStrategy)
.count()
.toStream()
.process(supplier);
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
processData(driver);
}
- assertThat(
- supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
- .get(new Windowed<>("1", new TimeWindow(0L, 500L))),
- equalTo(ValueAndTimestamp.make(2L, 15L)));
- assertThat(
- supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
- .get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
- equalTo(ValueAndTimestamp.make(2L, 550L)));
- assertThat(
- supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
- .get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
- equalTo(ValueAndTimestamp.make(1L, 500L)));
+ final ArrayList<KeyValueTimestamp<Windowed<String>, Long>> processed =
supplier.theCapturedProcessor().processed();
+
+ if (emitFinal) {
+ assertEquals(
+ asList(
+ new KeyValueTimestamp<>(KEY_1_WINDOW_0, 2L, 15L),
+ new KeyValueTimestamp<>(KEY_1_WINDOW_1, 1L, 500L),
+ new KeyValueTimestamp<>(KEY_2_WINDOW_1, 2L, 550L)
Review Comment:
Didn't get your question. The fetch for emit final is based on `windowStart`
order.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
droppedRecordsSensor.record();
}
}
+
+ maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime),
time, emitFinalLatencySensor);
+ }
+
+ private void tryEmitFinalResult(final Record<KIn, VIn> record, final
long closeTime) {
+ if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+ return;
+ }
+
+ final long now = internalProcessorContext.currentSystemTimeMs();
+ // Throttle emit frequency
+ if (now < timeTracker.nextTimeToEmit) {
+ return;
+ }
+
+ // Schedule next emit time based on now to avoid the case that if
system time jumps a lot,
+ // this can be triggered everytime
+ timeTracker.nextTimeToEmit = now;
+ timeTracker.advanceNextTimeToEmit();
+
+ // Close time does not progress
+ if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP &&
lastEmitCloseTime >= closeTime) {
+ return;
+ }
+
+ final long emitWindowStart = closeTime - windows.size();
+ if (emitWindowStart < 0) {
+ // If emitWindowStart is 0, it means first window closes since
windowEndTime
+ // is exclusive
+ return;
+ }
+
+ // Because we only get here when emitWindowStart > 0 which means
closeTime > windows.size()
+ // Since we set lastEmitCloseTime to closeTime before storing to
processor metadata
+ // lastEmitCloseTime - windows.size() is always > 0
+ // Set lastEmitWindowStart to -1L if not set so that when we
fetchAll, we fetch from 0L
+ final long lastEmitWindowStart = lastEmitCloseTime ==
ConsumerRecord.NO_TIMESTAMP ?
+ -1L : lastEmitCloseTime - windows.size();
+
+ if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) {
+ final Map<Long, W> matchedCloseWindows =
windows.windowsFor(emitWindowStart);
+ final Map<Long, W> matchedEmitWindows =
windows.windowsFor(lastEmitWindowStart);
+
+ // Don't fetch store if the new emit window close time doesn't
progress enough to cover next
+ // window
+ if (matchedCloseWindows.equals(matchedEmitWindows)) {
Review Comment:
> Is this the same as emitWindowStart == lastEmitWindowStart
Nope. They may not be the same. `emitWindowStart` is based on current
`closeTime` and `lastEmitWindowStart` is based last emitted close time. These
are not the same. The logic here to detect if `closeTime` doesn't progress much
to emit new windows.
For example, for tumbling window length 10sec and close time 11, window [0,
10) will be emitted, if close time progress to 12, window to emit is still [0,
10).
##########
streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java:
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThrows;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class TimeWindowedKStreamIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
+ mkProperties(
+ mkMap(mkEntry("log.retention.hours", "-1"),
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we
manipulate timestamp
+ )
+ );
+
+ @BeforeClass
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+
+ private StreamsBuilder builder;
+ private Properties streamsConfiguration;
+ private KafkaStreams kafkaStreams;
+ private String streamOneInput;
+ private String streamTwoInput;
+ private String outputTopic;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameter
+ public StrategyType type;
+
+ @Parameter(1)
+ public EmitStrategy emitStrategy;
+
+ private boolean emitFinal;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> getEmitStrategy() {
+ return asList(new Object[][] {
+ {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()},
+ {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()}
+ });
+ }
+
+ @Before
+ public void before() throws InterruptedException {
+ builder = new StreamsBuilder();
+ createTopics();
+ streamsConfiguration = new Properties();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
100L);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
0); // Always process
+
streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
Long.MAX_VALUE); // Don't expire changelog
+
+ emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE;
+ }
+
+ @After
+ public void whenShuttingDown() throws IOException {
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ kafkaStreams.cleanUp();
+ }
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ }
+
+ @Test
+ public void shouldAggregateWindowedWithNoGrace() throws Exception {
+ produceMessages(
+ streamOneInput,
+ new KeyValueTimestamp<>("A", "1", 0),
+ new KeyValueTimestamp<>("A", "1", 5),
+ new KeyValueTimestamp<>("A", "1", 10), // close [0, 10)
+ new KeyValueTimestamp<>("B", "2", 6), // late and skip
+ new KeyValueTimestamp<>("B", "2", 11), // close [0, 10)
+ new KeyValueTimestamp<>("B", "2", 15), // close [5, 15)
+ new KeyValueTimestamp<>("C", "3", 25) // close [10, 20), [15, 25)
+ );
+
+ final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, 10L);
+ // TODO: remove this cast
https://issues.apache.org/jira/browse/KAFKA-13800
+ final TimeWindowedKStreamImpl<String, String, TimeWindow>
windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) builder
+ .stream(streamOneInput, Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10L)).advanceBy(ofMillis(5L)));
+ windowedStream.emitStrategy(emitStrategy)
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.with(null, new StringSerde())
+ )
+ .toStream()
+ .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
+
+ startStreams();
+
+ final List<KeyValueTimestamp<Windowed<String>, String>>
windowedMessages = receiveMessagesWithTimestamp(
+ new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
+ new StringDeserializer(),
+ 10L,
+ String.class,
+ emitFinal ? 6 : 12);
+
+ final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
+ if (emitFinal) {
+ expectResult = asList(
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L,
10L)), "0+1+1", 5),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L,
15L)), "0+1+1", 10),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L,
15L)), "0+2+2", 11),
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(10L, 20L)), "0+1", 10),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(10L, 20L)), "0+2+2", 15),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(15L, 25L)), "0+2", 15)
+ );
+ } else {
+ expectResult = asList(
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L,
10L)), "0+1", 0),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L,
10L)), "0+1+1", 5),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L,
15L)), "0+1", 5),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L,
15L)), "0+1+1", 10),
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(10L, 20L)), "0+1", 10),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L,
15L)), "0+2", 6),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L,
15L)), "0+2+2", 11),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(10L, 20L)), "0+2", 11),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(10L, 20L)), "0+2+2", 15),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(15L, 25L)), "0+2", 15),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(20L, 30L)), "0+3", 25),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(25L, 35L)), "0+3", 25)
+ );
+ }
+
+ assertThat(windowedMessages, is(expectResult));
+ }
+
+ @Test
+ public void shouldAggregateWindowedWithGrace() throws Exception {
+ produceMessages(
+ streamOneInput,
+ new KeyValueTimestamp<>("A", "1", 0),
+ new KeyValueTimestamp<>("A", "1", 5),
+ new KeyValueTimestamp<>("A", "1", 10), // close [-5, 5)
Review Comment:
Not really 🥲 . Will update
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]