vamossagar12 commented on code in PR #12383:
URL: https://github.com/apache/kafka/pull/12383#discussion_r914702189


##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -137,54 +129,19 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
             maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
             log.info("Topology {} is overriding {} to {}", topologyName, 
BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
         } else {
-            // If the user hasn't explicitly set the 
buffered.records.per.partition config, then leave it unbounded
-            // and rely on the input.buffer.max.bytes instead to keep the 
memory usage under control
-            maxBufferedSize = 
globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
-                    ? 
globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : 
-1;
+            maxBufferedSize = 
globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
         }
 
-        final boolean stateStoreCacheMaxBytesOverridden = 
isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides);
-        final boolean cacheMaxBytesBufferingOverridden = 
isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
-
-        if (!stateStoreCacheMaxBytesOverridden && 
!cacheMaxBytesBufferingOverridden) {
-            cacheSize = getTotalCacheSize(globalAppConfigs);
+        if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+            cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+            log.info("Topology {} is overriding {} to {}", topologyName, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
         } else {
-            if (stateStoreCacheMaxBytesOverridden && 
cacheMaxBytesBufferingOverridden) {
-                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-                log.info("Topology {} is using both deprecated config {} and 
new config {}, hence {} is ignored and the new config {} (value {}) is used",
-                         topologyName,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         cacheSize);
-            } else if (cacheMaxBytesBufferingOverridden) {
-                cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-                log.info("Topology {} is using only deprecated config {}, and 
will be used to set cache size to {}; " +
-                             "we suggest setting the new config {} instead as 
deprecated {} would be removed in the future.",
-                         topologyName,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                         cacheSize,
-                         STATESTORE_CACHE_MAX_BYTES_CONFIG,
-                         CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            } else {
-                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-            }
-
-            if (cacheSize != 0) {
-                log.warn("Topology {} is overriding cache size to {} but this 
will not have any effect as the "
-                             + "topology-level cache size config only controls 
whether record buffering is enabled "
-                             + "or disabled, thus the only valid override 
value is 0",
-                         topologyName, cacheSize);
-            } else {
-                log.info("Topology {} is overriding cache size to {}, record 
buffering will be disabled",
-                         topologyName, cacheSize);
-            }
+            cacheSize = 
globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
         }
 
         if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
             maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG);
-            log.info("Topology {} is overriding {} to {}", topologyName, 
MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
+            log.info("Topology {} is overridding {} to {}", topologyName, 
MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);

Review Comment:
   nit: typo in overriding.. I think that's how it was originally. Comment can 
be ignored :) 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##########
@@ -55,7 +55,6 @@ public class RecordQueue {
 
     private final Sensor droppedRecordsSensor;
     private final Sensor consumedSensor;
-    private long totalBytesBuffered;
     private long headRecordSizeInBytes;

Review Comment:
   I think headRecordSizeInBytes was also added as part of the PR which should 
be removed:
   
   
https://github.com/apache/kafka/pull/11796/files#diff-2c19d764cad8fcbe7da8046cf0a01e525bc41a5e12e08e8c71d76c0f27ffc550R56



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1256,44 +1255,6 @@ public void 
shouldThrowExceptionWhenClientTagValueExceedMaxLimit() {
         );
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() {
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 100);
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 10);
-    }
-
-    @Test
-    public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 10);
-    }
-
-    @Test
-    public void 
shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
-        final StreamsConfig config = new StreamsConfig(props);
-        assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
-    }
-
-    @Test
-    public void testInvalidSecurityProtocol() {
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
-        final ConfigException ce = assertThrows(ConfigException.class,
-                () -> new StreamsConfig(props));
-        
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
-    }
-

Review Comment:
   This test should be there as it seems unconnected to the revert. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to