wcarlson5 commented on a change in pull request #9253: URL: https://github.com/apache/kafka/pull/9253#discussion_r562150019
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ########## @@ -235,25 +235,27 @@ public void shouldReduceWindowed() throws Exception { .thenComparing(KeyValueTimestamp::value); windowedOutput.sort(comparator); - final long firstBatchWindow = firstBatchTimestamp / 500 * 500; - final long secondBatchWindow = secondBatchTimestamp / 500 * 500; + final long firstBatchWindowStart = firstBatchTimestamp / 500 * 500; + final long firstBatchWindowEnd = firstBatchWindowStart + 500; Review comment: Why don't you use `timeDifference` like you did below? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java ########## @@ -33,22 +33,22 @@ */ public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> { - private final Long windowSize; + private Long windowSize; private boolean isChangelogTopic; private Deserializer<T> inner; - + // Default constructor needed by Kafka public TimeWindowedDeserializer() { - this(null, Long.MAX_VALUE); + this(null, null); } - // TODO: fix this part as last bits of KAFKA-4468 + @Deprecated public TimeWindowedDeserializer(final Deserializer<T> inner) { this(inner, Long.MAX_VALUE); Review comment: Do you want to change this from `Long.MAX_VALUE` as well? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ########## @@ -327,7 +329,7 @@ public void shouldAggregateWindowed() throws Exception { startStreams(); final List<KeyValueTimestamp<Windowed<String>, Integer>> windowedMessages = receiveMessagesWithTimestamp( - new TimeWindowedDeserializer<>(), Review comment: I don't think we should get rid of the generics unless we have to ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java ########## @@ -56,4 +57,23 @@ public void testWindowedValueDeserializerNoArgConstructors() { assertNotNull("Inner deserializer should be not null", inner); assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner instanceof ByteArrayDeserializer); } + + @Test + public void setWindowSizeThroughConfigs() { + props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500"); + final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>(); + deserializer.configure(props, false); Review comment: Is there some sort of check you can verify here? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org