lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465191505



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##########
@@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception {
         )));
     }
 
+
+
+    @Test
+    public void shouldReduceSlidingWindows() throws Exception {
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+        final long firstBatchTimestamp = 2000L;
+        final long timeDifference = 1000L;
+        produceMessages(firstBatchTimestamp);
+        final long secondBatchTimestamp = firstBatchTimestamp + timeDifference 
/ 2;
+        produceMessages(secondBatchTimestamp);
+        final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference 
- 100L;
+        produceMessages(thirdBatchTimestamp);
+
+        final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+        groupedStream
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
+                .reduce(reducer)
+                .toStream()
+                .to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
+
+        startStreams();
+
+        final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput 
= receiveMessages(
+                new TimeWindowedDeserializer<>(),
+                new StringDeserializer(),
+                String.class,
+                25);
+
+        // read from ConsoleConsumer
+        final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
+                new TimeWindowedDeserializer<String>(),
+                new StringDeserializer(),
+                String.class,
+                25,
+                true);
+
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().key())
+                        .thenComparing(KeyValueTimestamp::value);
+
+        windowedOutput.sort(comparator);
+        final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference;
+        final long firstBatchRightWindow = firstBatchTimestamp + 1;
+        final long secondBatchLeftWindow = secondBatchTimestamp - 
timeDifference;
+        final long secondBatchRightWindow = secondBatchTimestamp + 1;
+        final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
+
+        final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = 
Arrays.asList(
+                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),

Review comment:
       There seems to be a bug in `TimeWindowedDeserializer` related to [this 
ticket](https://issues.apache.org/jira/browse/KAFKA-4468) that ends up setting 
the windowSize to `Long.MAX_VALUE`. For the purposes of testing, I don't think 
having it as the max value is totally awful (just somewhat awful) and the 
window end calculations are all tested in a different set of tests done through 
topology driver. I'll make a ticket for this bug and try to get it fixed when 
I'm done with testing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to