lct45 commented on a change in pull request #10519:
URL: https://github.com/apache/kafka/pull/10519#discussion_r611637854



##########
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
##########
@@ -82,6 +82,16 @@ class CogroupedKStream[KIn, VOut](val inner: 
CogroupedKStreamJ[KIn, VOut]) {
   def windowedBy[W <: Window](windows: Windows[W]): 
TimeWindowedCogroupedKStream[KIn, VOut] =
     new TimeWindowedCogroupedKStream(inner.windowedBy(windows))
 
+  /**
+   * Create a new [[TimeWindowedCogroupedKStream]] instance that can be used 
to perform windowed aggregations.

Review comment:
       nit: `perform sliding windowed aggregations` instead of `perform 
windowed aggregations`

##########
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
##########
@@ -19,19 +19,8 @@ package kstream
 
 import org.apache.kafka.streams.kstream.internals.KTableImpl
 import org.apache.kafka.streams.scala.serialization.Serdes
-import org.apache.kafka.streams.kstream.{
-  SessionWindows,
-  Window,
-  Windows,
-  KGroupedStream => KGroupedStreamJ,
-  KTable => KTableJ
-}
-import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
-  AggregatorFromFunction,
-  InitializerFromFunction,
-  ReducerFromFunction,
-  ValueMapperFromFunction
-}
+import org.apache.kafka.streams.kstream.{SessionWindows, SlidingWindows, 
Window, Windows, KGroupedStream => KGroupedStreamJ, KTable => KTableJ}

Review comment:
       nit: we want to keep this import and the one below in the same format as 
before, one import per line, to keep checkstyle happy

##########
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
##########
@@ -17,15 +17,16 @@
 package org.apache.kafka.streams.scala.kstream
 
 import org.apache.kafka.streams.kstream.Suppressed.BufferConfig
-import org.apache.kafka.streams.kstream.{Named, SessionWindows, TimeWindows, 
Windowed, Suppressed => JSuppressed}
+import org.apache.kafka.streams.kstream.{Named, SessionWindows, 
SlidingWindows, TimeWindows, Windowed, Suppressed => JSuppressed}

Review comment:
       Checkstyle also flagged this because of line length

##########
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
##########
@@ -211,6 +212,44 @@ class KTableTest extends TestDriver {
     testDriver.close()
   }
 
+  @Test
+  def testCorrectlyGroupBySlidingWindow(): Unit = {

Review comment:
       Thanks for adding a test!

##########
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
##########
@@ -82,6 +82,16 @@ class CogroupedKStream[KIn, VOut](val inner: 
CogroupedKStreamJ[KIn, VOut]) {
   def windowedBy[W <: Window](windows: Windows[W]): 
TimeWindowedCogroupedKStream[KIn, VOut] =
     new TimeWindowedCogroupedKStream(inner.windowedBy(windows))
 
+  /**
+   * Create a new [[TimeWindowedCogroupedKStream]] instance that can be used 
to perform windowed aggregations.
+   *
+   * @param windows the specification of the aggregation `SlidingWindows`
+   * @return an instance of [[TimeWindowedCogroupedKStream]]
+   * @see `org.apache.kafka.streams.kstream.CogroupedKStream#windowedBy`
+   */
+  def windowedBy[W <: Window](windows: SlidingWindows): 
TimeWindowedCogroupedKStream[KIn, VOut] =

Review comment:
       IIUC, the `W <; Window` is defining the generic `W`, but we don't use it 
in this method so I don't think we need that here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to