vamossagar12 commented on code in PR #12383:
URL: https://github.com/apache/kafka/pull/12383#discussion_r914702189
##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -137,54 +129,19 @@ public TopologyConfig(final String topologyName, final
StreamsConfig globalAppCo
maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
log.info("Topology {} is overriding {} to {}", topologyName,
BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
} else {
- // If the user hasn't explicitly set the
buffered.records.per.partition config, then leave it unbounded
- // and rely on the input.buffer.max.bytes instead to keep the
memory usage under control
- maxBufferedSize =
globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
- ?
globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) :
-1;
+ maxBufferedSize =
globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
}
- final boolean stateStoreCacheMaxBytesOverridden =
isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides);
- final boolean cacheMaxBytesBufferingOverridden =
isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
-
- if (!stateStoreCacheMaxBytesOverridden &&
!cacheMaxBytesBufferingOverridden) {
- cacheSize = getTotalCacheSize(globalAppConfigs);
+ if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG,
topologyOverrides)) {
+ cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ log.info("Topology {} is overriding {} to {}", topologyName,
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
} else {
- if (stateStoreCacheMaxBytesOverridden &&
cacheMaxBytesBufferingOverridden) {
- cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
- log.info("Topology {} is using both deprecated config {} and
new config {}, hence {} is ignored and the new config {} (value {}) is used",
- topologyName,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- cacheSize);
- } else if (cacheMaxBytesBufferingOverridden) {
- cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
- log.info("Topology {} is using only deprecated config {}, and
will be used to set cache size to {}; " +
- "we suggest setting the new config {} instead as
deprecated {} would be removed in the future.",
- topologyName,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- cacheSize,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- CACHE_MAX_BYTES_BUFFERING_CONFIG);
- } else {
- cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
- }
-
- if (cacheSize != 0) {
- log.warn("Topology {} is overriding cache size to {} but this
will not have any effect as the "
- + "topology-level cache size config only controls
whether record buffering is enabled "
- + "or disabled, thus the only valid override
value is 0",
- topologyName, cacheSize);
- } else {
- log.info("Topology {} is overriding cache size to {}, record
buffering will be disabled",
- topologyName, cacheSize);
- }
+ cacheSize =
globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
}
if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG);
- log.info("Topology {} is overriding {} to {}", topologyName,
MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
+ log.info("Topology {} is overridding {} to {}", topologyName,
MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
Review Comment:
nit: typo in overriding.. I think that's how it was originally. Comment can
be ignored :)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##########
@@ -55,7 +55,6 @@ public class RecordQueue {
private final Sensor droppedRecordsSensor;
private final Sensor consumedSensor;
- private long totalBytesBuffered;
private long headRecordSizeInBytes;
Review Comment:
I think headRecordSizeInBytes was also added as part of the PR which should
be removed:
https://github.com/apache/kafka/pull/11796/files#diff-2c19d764cad8fcbe7da8046cf0a01e525bc41a5e12e08e8c71d76c0f27ffc550R56
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1256,44 +1255,6 @@ public void
shouldThrowExceptionWhenClientTagValueExceedMaxLimit() {
);
}
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() {
- props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
- final StreamsConfig config = new StreamsConfig(props);
- assertEquals(getTotalCacheSize(config), 100);
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
- final StreamsConfig config = new StreamsConfig(props);
- assertEquals(getTotalCacheSize(config), 10);
- }
-
- @Test
- public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
- props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
- final StreamsConfig config = new StreamsConfig(props);
- assertEquals(getTotalCacheSize(config), 10);
- }
-
- @Test
- public void
shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
- final StreamsConfig config = new StreamsConfig(props);
- assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
- }
-
- @Test
- public void testInvalidSecurityProtocol() {
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
- final ConfigException ce = assertThrows(ConfigException.class,
- () -> new StreamsConfig(props));
-
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
- }
-
Review Comment:
This test should be there as it seems unconnected to the revert.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]