cadonna commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r521249074



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,17 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != 
null) ? 1 : 0));
+    }

Review comment:
       IMO, the code would be easier navigable if you inline this method. 
Without the removed check, there is not really a reason to have a method here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +589,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        taskManager.resizeCache(size);

Review comment:
       > I was not planning on having the new thread resize the cache but the 
calling thread do so
   
   That is what I am saying "the thread that will add the new stream thread" is 
the calling thread. The new stream thread cannot resize the caches of the other 
stream threads because it is not aware of the other stream threads. Still we 
need synchronization because the calling thread will access and modify the 
thread caches of all stream threads and all stream threads will access and 
modify their own thread cache during normal processing.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -155,6 +155,7 @@
     private final StreamsMetricsImpl streamsMetrics;
     private final ProcessorTopology taskTopology;
     private final ProcessorTopology globalTaskTopology;
+    private final Long totalCacheSize;

Review comment:
       Why does this need to be a `Long` instead of a `long`? The numerical 
value of the variable is only immutable if we use a `long` here.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to