cadonna commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517426911



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -145,7 +145,7 @@
     private final String clientId;
     private final Metrics metrics;
     private final StreamsConfig config;
-    protected final StreamThread[] threads;
+    protected final ArrayList<StreamThread> threads;

Review comment:
       I would prefer to use `List` instead of `ArrayList` to be more generic.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         }
 
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[numStreamThreads];
-
+        threads = new ArrayList<>(numStreamThreads);

Review comment:
       I think, it is better to keep the default initial capacity of an 
`ArrayList`. Otherwise, the first time a stream thread is added, we immediately 
run into a memory allocation. Since we do not know how many stream thread we 
might expect, let's use the default.   
   We could also consider using a `LinkedList` since we never access by index 
in production code.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);
+            threadState.put(threads.get(i).getId(), threads.get(i).state());
+            storeProviders.add(new 
StreamThreadStateStoreProvider(threads.get(i)));
         }
 
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-            Math.toIntExact(Arrays.stream(threads).filter(thread -> 
thread.state().isAlive()).count()));
+            Math.toIntExact(Arrays.stream(threads.toArray(new 
StreamThread[numStreamThreads])).filter(thread -> 
thread.state().isAlive()).count()));

Review comment:
       Please simplify to
   ```suggestion
               Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);
+            threadState.put(threads.get(i).getId(), threads.get(i).state());
+            storeProviders.add(new 
StreamThreadStateStoreProvider(threads.get(i)));

Review comment:
       You can simplify to
   ```suggestion
               threads.add(streamThread);
               threadState.put(streamThread.getId(), streamThread.state());
               storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to