lct45 commented on a change in pull request #10519: URL: https://github.com/apache/kafka/pull/10519#discussion_r611637854
########## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala ########## @@ -82,6 +82,16 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) { def windowedBy[W <: Window](windows: Windows[W]): TimeWindowedCogroupedKStream[KIn, VOut] = new TimeWindowedCogroupedKStream(inner.windowedBy(windows)) + /** + * Create a new [[TimeWindowedCogroupedKStream]] instance that can be used to perform windowed aggregations. Review comment: nit: `perform sliding windowed aggregations` instead of `perform windowed aggregations` ########## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala ########## @@ -19,19 +19,8 @@ package kstream import org.apache.kafka.streams.kstream.internals.KTableImpl import org.apache.kafka.streams.scala.serialization.Serdes -import org.apache.kafka.streams.kstream.{ - SessionWindows, - Window, - Windows, - KGroupedStream => KGroupedStreamJ, - KTable => KTableJ -} -import org.apache.kafka.streams.scala.FunctionsCompatConversions.{ - AggregatorFromFunction, - InitializerFromFunction, - ReducerFromFunction, - ValueMapperFromFunction -} +import org.apache.kafka.streams.kstream.{SessionWindows, SlidingWindows, Window, Windows, KGroupedStream => KGroupedStreamJ, KTable => KTableJ} Review comment: nit: we want to keep this import and the one below in the same format as before, one import per line, to keep checkstyle happy ########## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala ########## @@ -17,15 +17,16 @@ package org.apache.kafka.streams.scala.kstream import org.apache.kafka.streams.kstream.Suppressed.BufferConfig -import org.apache.kafka.streams.kstream.{Named, SessionWindows, TimeWindows, Windowed, Suppressed => JSuppressed} +import org.apache.kafka.streams.kstream.{Named, SessionWindows, SlidingWindows, TimeWindows, Windowed, Suppressed => JSuppressed} Review comment: Checkstyle also flagged this because of line length ########## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala ########## @@ -211,6 +212,44 @@ class KTableTest extends TestDriver { testDriver.close() } + @Test + def testCorrectlyGroupBySlidingWindow(): Unit = { Review comment: Thanks for adding a test! ########## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala ########## @@ -82,6 +82,16 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) { def windowedBy[W <: Window](windows: Windows[W]): TimeWindowedCogroupedKStream[KIn, VOut] = new TimeWindowedCogroupedKStream(inner.windowedBy(windows)) + /** + * Create a new [[TimeWindowedCogroupedKStream]] instance that can be used to perform windowed aggregations. + * + * @param windows the specification of the aggregation `SlidingWindows` + * @return an instance of [[TimeWindowedCogroupedKStream]] + * @see `org.apache.kafka.streams.kstream.CogroupedKStream#windowedBy` + */ + def windowedBy[W <: Window](windows: SlidingWindows): TimeWindowedCogroupedKStream[KIn, VOut] = Review comment: IIUC, the `W <; Window` is defining the generic `W`, but we don't use it in this method so I don't think we need that here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org