ableegoldman commented on a change in pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#discussion_r837073268



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1038,14 +1041,15 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
             final StreamThread streamThread;
             synchronized (changeThreadCount) {
                 final int threadIdx = getNextThreadIndex();
-                final int numLiveThreads = getNumLiveStreamThreads();
-                final long cacheSizePerThread = 
getCacheSizePerThread(numLiveThreads + 1);
-                log.info("Adding StreamThread-{}, there will now be {} live 
threads and the new cache size per thread is {}",
-                         threadIdx, numLiveThreads + 1, cacheSizePerThread);
-                resizeThreadCache(cacheSizePerThread);
                 // Creating thread should hold the lock in order to avoid 
duplicate thread index.
                 // If the duplicate index happen, the metadata of thread may 
be duplicate too.
-                streamThread = createAndAddStreamThread(cacheSizePerThread, 
threadIdx);
+                // Also, we create the new thread with initial values of cache 
size and max buffer size as 0
+                // and then resize them later
+                streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
+                final int numLiveThreads = getNumLiveStreamThreads();
+                resizeThreadCacheAndBufferMemory(numLiveThreads + 1);

Review comment:
       I think this feedback got lost in the shuffle when we reverted the 
original PR, but this needs to be fixed -- the `+ 1` was only necessary in the 
old code because we resized the cache before adding the new thread/computing 
the thread count. Now that we first create the new thread, the `numLiveThreads` 
count should accurately reflect the number of current threads, so we shouldn't 
be adding to it anymore.
   
   I'll include a fix for this on the side in a PR I'm doing so no worries 🙂 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -971,14 +972,16 @@ private KafkaStreams(final TopologyMetadata 
topologyMetadata,
 
         queryableStoreProvider = new 
QueryableStoreProvider(globalStateStoreProvider);
         for (int i = 1; i <= numStreamThreads; i++) {
-            createAndAddStreamThread(cacheSizePerThread, i);
+            createAndAddStreamThread(0L, 0L, i);
         }
+        // Initially, all Stream Threads are created with 0 cache size and max 
buffer size and then resized here.

Review comment:
       Why do we do it this way? ie rather than just computing the size upfront 
and creating the threads with that? I find this a bit confusing, mainly because 
I can't tell if there is a technical reason for doing it this way or it was 
just a design choice




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to