cadonna commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r521249074
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,17 @@ private KafkaStreams(final InternalTopologyBuilder
internalTopologyBuilder,
rocksDBMetricsRecordingService =
maybeCreateRocksDBMetricsRecordingService(clientId, config);
}
+ private long getCacheSizePerThread(final int numStreamThreads) {
+ return totalCacheSize / (numStreamThreads + ((globalTaskTopology !=
null) ? 1 : 0));
+ }
Review comment:
IMO, the code would be easier navigable if you inline this method.
Without the removed check, there is not really a reason to have a method here.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +589,10 @@ private void subscribeConsumer() {
}
}
+ public void resizeCache(final long size) {
+ taskManager.resizeCache(size);
Review comment:
> I was not planning on having the new thread resize the cache but the
calling thread do so
That is what I am saying "the thread that will add the new stream thread" is
the calling thread. The new stream thread cannot resize the caches of the other
stream threads because it is not aware of the other stream threads. Still we
need synchronization because the calling thread will access and modify the
thread caches of all stream threads and all stream threads will access and
modify their own thread cache during normal processing.
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -155,6 +155,7 @@
private final StreamsMetricsImpl streamsMetrics;
private final ProcessorTopology taskTopology;
private final ProcessorTopology globalTaskTopology;
+ private final Long totalCacheSize;
Review comment:
Why does this need to be a `Long` instead of a `long`? The numerical
value of the variable is only immutable if we use a `long` here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]