lucasbru commented on code in PR #18233: URL: https://github.com/apache/kafka/pull/18233#discussion_r1922600453
########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -592,6 +592,10 @@ public class StreamsConfig extends AbstractConfig { public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." + " The cluster must have a client metrics subscription which corresponds to a client."; + /** {@code ensure.explicit.internal.resource.naming} */ + public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG = "ensure.explicit.internal.resource.naming"; + public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = "Whether to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores." + + " When enabled, the application will fail to start if any internal resource has an auto-generated name."; Review Comment: ```suggestion " When enabled, the application will refuse to start if any internal resource has an auto-generated name."; ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -869,6 +873,11 @@ public class StreamsConfig extends AbstractConfig { Importance.HIGH, STATE_DIR_DOC, "${java.io.tmpdir}") + .define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, + Type.BOOLEAN, Review Comment: nit: indentation is off ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -592,6 +592,10 @@ public class StreamsConfig extends AbstractConfig { public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." + " The cluster must have a client metrics subscription which corresponds to a client."; + /** {@code ensure.explicit.internal.resource.naming} */ + public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG = "ensure.explicit.internal.resource.naming"; + public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = "Whether to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores." + Review Comment: nit: line wrap? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java: ########## @@ -214,8 +215,7 @@ private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator< final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ? repartitionReqs.userProvidedRepartitionTopicName : storeName; - createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde); - + createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde, repartitionReqs.userProvidedRepartitionTopicName != null || queryableName != null); Review Comment: nit: Can we store the boolean expression in a local variable and give it a name, so it's easier to understand what you are passing (will also shorten the line). ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -192,6 +196,10 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { private boolean hasPersistentStores = false; + private boolean ensureExplicitInternalResourceNaming; + + private Set<InternalResourcesNaming> unprovidedInternalNames = new LinkedHashSet<>(); Review Comment: could be final ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java: ########## @@ -204,7 +204,8 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup } private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, Review Comment: nit: if you put the last two arguments on a separate line, also put `groupPatterns` on a separate line. ########## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ########## @@ -384,7 +384,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>( - Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), + Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()).withLoggingDisabled(), Review Comment: Not sure I understand why you are disabling logging here ########## streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java: ########## @@ -2354,6 +2363,418 @@ public void shouldNowAllowStreamAndTableFromSameTopic() { assertThrows(TopologyException.class, builder::build); } + @ParameterizedTest + @CsvSource({ + "true, false, true, false", + "false, true, true, true", + "true, true, true, true", + "false, false, false, false" + }) + public void groupByWithAggregationTest(final boolean isGroupByKeyNamed, + final boolean isMaterializedNamed, + final boolean isLoggingEnabled, + final boolean isValid) { + final Map<Object, Object> props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + + final Grouped<String, String> grouped; + final Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized; + if (isGroupByKeyNamed) { + grouped = Grouped.with("repartition-name", Serdes.String(), Serdes.String()); + } else { + grouped = Grouped.with(Serdes.String(), Serdes.String()); + } + if (isMaterializedNamed) { + materialized = Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()); + } else { + if (isLoggingEnabled) { + materialized = Materialized.with(Serdes.String(), Serdes.Long()); + } else { + materialized = Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), Serdes.Long()) + .withLoggingDisabled(); + } + } + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + final KStream<String, String> stream = builder.stream("input1"); + stream + .groupBy((k, v) -> v, grouped) + .count(materialized) + .toStream() + .to("output", Produced.as("sink")); + + if (isValid) { + assertDoesNotThrow(() -> builder.build()); + } else { + final TopologyException e = assertThrows(TopologyException.class, () -> builder.build()); Review Comment: Can we check the exception message? It seems rather important here. Generally, if this will become hard to implement, maybe we can use 4 separate tests instead of a parametrized test, and a private method to avoid repeating everything 4 times. Same goes for the tests below. Alternatively, we could check the exception message in a unit test for `InternalTopicBuilder`. ########## streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java: ########## @@ -2354,6 +2363,418 @@ public void shouldNowAllowStreamAndTableFromSameTopic() { assertThrows(TopologyException.class, builder::build); } + @ParameterizedTest + @CsvSource({ + "true, false, true, false", + "false, true, true, true", + "true, true, true, true", + "false, false, false, false" + }) + public void groupByWithAggregationTest(final boolean isGroupByKeyNamed, + final boolean isMaterializedNamed, + final boolean isLoggingEnabled, + final boolean isValid) { + final Map<Object, Object> props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + + final Grouped<String, String> grouped; + final Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized; + if (isGroupByKeyNamed) { + grouped = Grouped.with("repartition-name", Serdes.String(), Serdes.String()); + } else { + grouped = Grouped.with(Serdes.String(), Serdes.String()); + } + if (isMaterializedNamed) { + materialized = Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()); + } else { + if (isLoggingEnabled) { + materialized = Materialized.with(Serdes.String(), Serdes.Long()); + } else { + materialized = Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), Serdes.Long()) + .withLoggingDisabled(); + } + } + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + final KStream<String, String> stream = builder.stream("input1"); + stream + .groupBy((k, v) -> v, grouped) + .count(materialized) + .toStream() + .to("output", Produced.as("sink")); + + if (isValid) { + assertDoesNotThrow(() -> builder.build()); + } else { + final TopologyException e = assertThrows(TopologyException.class, () -> builder.build()); + // TODO check the error with loggingDisabled Review Comment: Do you plan to address this `TODO` in this PR? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -489,7 +492,8 @@ private void mergeRepartitionTopics() { //passing in the name of the first repartition topic, re-used to create the optimized repartition topic final GraphNode optimizedSingleRepartition = createRepartitionNode(repartitionTopicName, groupedInternal.keySerde(), - groupedInternal.valueSerde()); + groupedInternal.valueSerde(), + true); Review Comment: It seems we can remove the new boolean parameter here, since it's always true. It's also a bit misleading, because the repartition topic name isn't necessarily provided by the user, we just don't check it at this point. ########## streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java: ########## @@ -2354,6 +2363,418 @@ public void shouldNowAllowStreamAndTableFromSameTopic() { assertThrows(TopologyException.class, builder::build); } + @ParameterizedTest + @CsvSource({ + "true, false, true, false", + "false, true, true, true", + "true, true, true, true", + "false, false, false, false" + }) + public void groupByWithAggregationTest(final boolean isGroupByKeyNamed, Review Comment: The name of the test should probably follow the `should...` format as the other tests, and mention something about detected implicit internal resource names. The same goes for the other tests below. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -2333,4 +2341,33 @@ public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wra processorWrapper.wrapProcessorSupplier(name, processorSupplier) ); } + + public void addUnprovidedInternalTopics(final InternalResourcesNaming internalResourcesNaming) { Review Comment: Replace "Topics" by "Names" as you are also talking about state stores? Additionally, I was wondering if you have considered using something like `implicit` instead of `unprovided` as it's clearer that it's the opposite of `explicit` in the config? ########## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ########## @@ -457,7 +457,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, Objects.requireNonNull(materialized, "materialized can't be null"); final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); // always use the serdes from consumed - materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()); + materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()).withLoggingDisabled(); Review Comment: Same as above ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -2333,4 +2341,33 @@ public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wra processorWrapper.wrapProcessorSupplier(name, processorSupplier) ); } + + public void addUnprovidedInternalTopics(final InternalResourcesNaming internalResourcesNaming) { + unprovidedInternalNames.add(internalResourcesNaming); + } + + public void checkUnprovidedNames() { + if (!unprovidedInternalNames.isEmpty()) { + final StringBuilder result = new StringBuilder(); + for (final InternalResourcesNaming internalResourcesNaming : unprovidedInternalNames) { + if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) { + result.append(String.format("Following changelog topic has not been named: %s%n", internalResourcesNaming.changelogTopic())); Review Comment: It could be nicer to collect the resource names by type and output one line per type? As in: ``` Following changelog topic has not been named: a, b, c ``` This is just a suggestion ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -192,6 +196,10 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { private boolean hasPersistentStores = false; + private boolean ensureExplicitInternalResourceNaming; Review Comment: Could be final ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +public final class InternalResourcesNaming { + + private String repartitionTopic; + private String changelogTopic; + private String stateStore; + + private InternalResourcesNaming() { + } + + public static InternalResourcesNaming build() { Review Comment: This class at first looks like a builder, but it's something different, since you build first and change values afterwards. I would add a proper builder, or just use a regular class with a public constructor and setters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org