cadonna commented on a change in pull request #10507:
URL: https://github.com/apache/kafka/pull/10507#discussion_r642403697



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
##########
@@ -32,7 +32,7 @@
     private static final Logger log = 
LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
 
     @Override
-    public DeserializationHandlerResponse handle(final ProcessorContext 
context,
+    public DeserializationHandlerResponse handle(final ProcessorContext<?, ?> 
context,

Review comment:
       Do we need to deprecate also this method and add a new one? Technically, 
it is a class of the public API that can be extended.  

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -48,21 +48,20 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-    @SuppressWarnings("unchecked")
-    <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, 
Aggregator<? super K, ? super Object, VOut>> groupPatterns,
-                                final Initializer<VOut> initializer,
-                                final NamedInternal named,
-                                final StoreBuilder<?> storeBuilder,
-                                final Serde<KR> keySerde,
-                                final Serde<VOut> valueSerde,
-                                final String queryableName) {
+    <KOut> KTable<KOut, VOut> build(final Map<KGroupedStreamImpl<K, ?>, 
Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                    final Initializer<VOut> initializer,
+                                    final NamedInternal named,
+                                    final StoreBuilder<?> storeBuilder,
+                                    final Serde<KOut> keySerde,
+                                    final Serde<VOut> valueSerde,
+                                    final String queryableName) {
         processRepartitions(groupPatterns, storeBuilder);
         final Collection<GraphNode> processors = new ArrayList<>();
-        final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
+        final Collection<KStreamAggregateProcessorSupplier> parentProcessors = 
new ArrayList<>();
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor =
+            final KStreamAggregateProcessorSupplier<K, K, ?, ?> 
parentProcessor =

Review comment:
       Shouldn't this be `KStreamAggregateProcessorSupplier<K, ?, K, ?>`? The 
positions of the parameters `KOut` and `VIn` on 
`KStreamAggregateProcessorSupplier` changed with respect to 
`KStreamAggProcessorSupplier`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
##########
@@ -183,7 +182,9 @@
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         return doAggregate(
-            new KStreamAggregate<>(materializedInternal.storeName(), 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+            new KStreamAggregate<>(materializedInternal.storeName(),
+                aggregateBuilder.countInitializer,
+                aggregateBuilder.countAggregator),

Review comment:
       ```suggestion
               new KStreamAggregate<>(
                   materializedInternal.storeName(),
                   aggregateBuilder.countInitializer,
                   aggregateBuilder.countAggregator
               ),
   ```
   or
   ```suggestion
               new KStreamAggregate<>(
                   materializedInternal.storeName(),
                   aggregateBuilder.countInitializer,
                   aggregateBuilder.countAggregator),
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
##########
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Objects;
+import java.util.Set;

Review comment:
       See my comment above about import order.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
##########
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;

Review comment:
       In [KAFKA-10787](https://issues.apache.org/jira/browse/KAFKA-10787) we 
agreed on an import order `kafka`, `org.apache.kafka`, `com`, `net`, `org`, 
`java`, `javax` and static imports. Additionally, there should be a empty line 
between import blocks.
   
   Note, PR #10428 introduces check and a formatter for this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -26,29 +26,29 @@
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.ConnectedStoreProvider;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;

Review comment:
       I skimmed over this interface and changes are in line breaks of comments 
and renaming of type parameters. In the interest of good reviews, I would not 
do those changes in this PR but rather open a separate PR for this interface. 
However, I might have missed an important part. @jeqo Could you clarify?
   
   Regarding the comments, we usually add a break after each sentence.  

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -80,24 +79,23 @@
         return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeBuilder.name());
     }
 
-    @SuppressWarnings("unchecked")
-    <KR, W extends Window> KTable<KR, VOut> build(final 
Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> 
groupPatterns,
-                                                  final Initializer<VOut> 
initializer,
-                                                  final NamedInternal named,
-                                                  final StoreBuilder<?> 
storeBuilder,
-                                                  final Serde<KR> keySerde,
-                                                  final Serde<VOut> valueSerde,
-                                                  final String queryableName,
-                                                  final Windows<W> windows) {
+    <KOut, W extends Window> KTable<KOut, VOut> build(final 
Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> 
groupPatterns,
+                                                      final Initializer<VOut> 
initializer,
+                                                      final NamedInternal 
named,
+                                                      final StoreBuilder<?> 
storeBuilder,
+                                                      final Serde<KOut> 
keySerde,
+                                                      final Serde<VOut> 
valueSerde,
+                                                      final String 
queryableName,
+                                                      final Windows<W> 
windows) {
         processRepartitions(groupPatterns, storeBuilder);
 
         final Collection<GraphNode> processors = new ArrayList<>();
-        final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
+        final Collection<KStreamAggregateProcessorSupplier> parentProcessors = 
new ArrayList<>();
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final KStreamAggProcessorSupplier<K, K, ?, ?>  parentProcessor =
-                (KStreamAggProcessorSupplier<K, K, ?, ?>) new 
KStreamWindowAggregate<K, K, VOut, W>(
+            final KStreamWindowAggregate<K, K, VOut, W> parentProcessor =

Review comment:
       Shouldn't this be `KStreamWindowAggregate<K, VOut, K, W>`? Here I am not 
sure if I am missing something since the type parameter positions did not 
change. Why is the type parameter for `V` in `KStreamWindowAggregate` `K` and 
not `?`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
##########
@@ -32,7 +32,7 @@
     private static final Logger log = 
LoggerFactory.getLogger(LogAndFailExceptionHandler.class);
 
     @Override
-    public DeserializationHandlerResponse handle(final ProcessorContext 
context,
+    public DeserializationHandlerResponse handle(final ProcessorContext<?, ?> 
context,

Review comment:
       Do we need to deprecate also this method and add a new one? Technically, 
it is a class of the public API that can be extended.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to