lucasbru commented on code in PR #18233:
URL: https://github.com/apache/kafka/pull/18233#discussion_r1922600453


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -592,6 +592,10 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable 
pushing of internal client metrics for (main, restore, and global) consumers, 
producers, and admin clients." +
         " The cluster must have a client metrics subscription which 
corresponds to a client.";
 
+    /** {@code ensure.explicit.internal.resource.naming} */
+    public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG 
= "ensure.explicit.internal.resource.naming";
+    public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = 
"Whether to enforce explicit naming for all internal resources of the topology, 
including internal topics (e.g., changelog and repartition topics) and their 
associated state stores." +
+        " When enabled, the application will fail to start if any internal 
resource has an auto-generated name.";

Review Comment:
   ```suggestion
           " When enabled, the application will refuse to start if any internal 
resource has an auto-generated name.";
   
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -869,6 +873,11 @@ public class StreamsConfig extends AbstractConfig {
                     Importance.HIGH,
                     STATE_DIR_DOC,
                     "${java.io.tmpdir}")
+            .define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
+                Type.BOOLEAN,

Review Comment:
   nit: indentation is off



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -592,6 +592,10 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable 
pushing of internal client metrics for (main, restore, and global) consumers, 
producers, and admin clients." +
         " The cluster must have a client metrics subscription which 
corresponds to a client.";
 
+    /** {@code ensure.explicit.internal.resource.naming} */
+    public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG 
= "ensure.explicit.internal.resource.naming";
+    public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = 
"Whether to enforce explicit naming for all internal resources of the topology, 
including internal topics (e.g., changelog and repartition topics) and their 
associated state stores." +

Review Comment:
   nit: line wrap?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java:
##########
@@ -214,8 +215,7 @@ private void processRepartitions(final 
Map<KGroupedStreamImpl<K, ?>, Aggregator<
                 final String repartitionNamePrefix = 
repartitionReqs.userProvidedRepartitionTopicName != null ?
                     repartitionReqs.userProvidedRepartitionTopicName : 
storeName;
 
-                createRepartitionSource(repartitionNamePrefix, 
repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);
-
+                createRepartitionSource(repartitionNamePrefix, 
repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde, 
repartitionReqs.userProvidedRepartitionTopicName != null || queryableName != 
null);

Review Comment:
   nit: Can we store the boolean expression in a local variable and give it a 
name, so it's easier to understand what you are passing (will also shorten the 
line). 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -192,6 +196,10 @@ public InternalTopologyBuilder(final TopologyConfig 
topologyConfigs) {
 
     private boolean hasPersistentStores = false;
 
+    private boolean ensureExplicitInternalResourceNaming;
+
+    private Set<InternalResourcesNaming> unprovidedInternalNames = new 
LinkedHashSet<>();

Review Comment:
   could be final



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java:
##########
@@ -204,7 +204,8 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, 
?>, Aggregator<? sup
     }
 
     private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, 
Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review Comment:
   nit: if you put the last two arguments on a separate line, also put 
`groupPatterns` on a separate line.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -384,7 +384,7 @@ public synchronized <K, V> GlobalKTable<K, V> 
globalTable(final String topic,
 
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal =
             new MaterializedInternal<>(
-                Materialized.with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()),
+                Materialized.<K, V, KeyValueStore<Bytes, 
byte[]>>with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()).withLoggingDisabled(),

Review Comment:
   Not sure I understand why you are disabling logging here



##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -2354,6 +2363,418 @@ public void shouldNowAllowStreamAndTableFromSameTopic() 
{
         assertThrows(TopologyException.class, builder::build);
     }
 
+    @ParameterizedTest
+    @CsvSource({
+        "true, false, true, false",
+        "false, true, true, true",
+        "true, true, true, true",
+        "false, false, false, false"
+    })
+    public void groupByWithAggregationTest(final boolean isGroupByKeyNamed,
+                                           final boolean isMaterializedNamed,
+                                           final boolean isLoggingEnabled,
+                                           final boolean isValid) {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+
+        final Grouped<String, String> grouped;
+        final Materialized<String, Long, KeyValueStore<Bytes, byte[]>> 
materialized;
+        if (isGroupByKeyNamed) {
+            grouped = Grouped.with("repartition-name", Serdes.String(), 
Serdes.String());
+        } else {
+            grouped = Grouped.with(Serdes.String(), Serdes.String());
+        }
+        if (isMaterializedNamed) {
+            materialized = Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long());
+        } else {
+            if (isLoggingEnabled) {
+                materialized = Materialized.with(Serdes.String(), 
Serdes.Long());
+            } else {
+                materialized = Materialized.<String, Long, 
KeyValueStore<Bytes, byte[]>>with(Serdes.String(), Serdes.Long())
+                    .withLoggingDisabled();
+            }
+        }
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+        final KStream<String, String> stream = builder.stream("input1");
+        stream
+            .groupBy((k, v) -> v, grouped)
+            .count(materialized)
+            .toStream()
+            .to("output", Produced.as("sink"));
+
+        if (isValid) {
+            assertDoesNotThrow(() -> builder.build());
+        } else {
+            final TopologyException e = assertThrows(TopologyException.class, 
() -> builder.build());

Review Comment:
   Can we check the exception message? It seems rather important here. 
Generally, if this will become hard to implement, maybe we can use 4 separate 
tests instead of a parametrized test, and a private method to avoid repeating 
everything 4 times. Same goes for the tests below.
   
   Alternatively, we could check the exception message in a unit test for 
`InternalTopicBuilder`.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -2354,6 +2363,418 @@ public void shouldNowAllowStreamAndTableFromSameTopic() 
{
         assertThrows(TopologyException.class, builder::build);
     }
 
+    @ParameterizedTest
+    @CsvSource({
+        "true, false, true, false",
+        "false, true, true, true",
+        "true, true, true, true",
+        "false, false, false, false"
+    })
+    public void groupByWithAggregationTest(final boolean isGroupByKeyNamed,
+                                           final boolean isMaterializedNamed,
+                                           final boolean isLoggingEnabled,
+                                           final boolean isValid) {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+
+        final Grouped<String, String> grouped;
+        final Materialized<String, Long, KeyValueStore<Bytes, byte[]>> 
materialized;
+        if (isGroupByKeyNamed) {
+            grouped = Grouped.with("repartition-name", Serdes.String(), 
Serdes.String());
+        } else {
+            grouped = Grouped.with(Serdes.String(), Serdes.String());
+        }
+        if (isMaterializedNamed) {
+            materialized = Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long());
+        } else {
+            if (isLoggingEnabled) {
+                materialized = Materialized.with(Serdes.String(), 
Serdes.Long());
+            } else {
+                materialized = Materialized.<String, Long, 
KeyValueStore<Bytes, byte[]>>with(Serdes.String(), Serdes.Long())
+                    .withLoggingDisabled();
+            }
+        }
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+        final KStream<String, String> stream = builder.stream("input1");
+        stream
+            .groupBy((k, v) -> v, grouped)
+            .count(materialized)
+            .toStream()
+            .to("output", Produced.as("sink"));
+
+        if (isValid) {
+            assertDoesNotThrow(() -> builder.build());
+        } else {
+            final TopologyException e = assertThrows(TopologyException.class, 
() -> builder.build());
+            // TODO check the error with loggingDisabled

Review Comment:
   Do you plan to address this `TODO` in this PR?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -489,7 +492,8 @@ private void mergeRepartitionTopics() {
             //passing in the name of the first repartition topic, re-used to 
create the optimized repartition topic
             final GraphNode optimizedSingleRepartition = 
createRepartitionNode(repartitionTopicName,
                                                                                
groupedInternal.keySerde(),
-                                                                               
groupedInternal.valueSerde());
+                                                                               
groupedInternal.valueSerde(),
+                                                                               
true);

Review Comment:
   It seems we can remove the new boolean parameter here, since it's always 
true. It's also a bit misleading, because the repartition topic name isn't 
necessarily provided by the user, we just don't check it at this point.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -2354,6 +2363,418 @@ public void shouldNowAllowStreamAndTableFromSameTopic() 
{
         assertThrows(TopologyException.class, builder::build);
     }
 
+    @ParameterizedTest
+    @CsvSource({
+        "true, false, true, false",
+        "false, true, true, true",
+        "true, true, true, true",
+        "false, false, false, false"
+    })
+    public void groupByWithAggregationTest(final boolean isGroupByKeyNamed,

Review Comment:
   The name of the test should probably follow the `should...` format as the 
other tests, and mention something about detected implicit internal resource 
names. The same goes for the other tests below.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2333,4 +2341,33 @@ public <KIn, VIn, KOut, VOut> 
WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wra
             processorWrapper.wrapProcessorSupplier(name, processorSupplier)
         );
     }
+
+    public void addUnprovidedInternalTopics(final InternalResourcesNaming 
internalResourcesNaming) {

Review Comment:
   Replace "Topics" by "Names" as you are also talking about state stores?
   
   Additionally, I was wondering if you have considered using something like 
`implicit` instead of `unprovided` as it's clearer that it's the opposite of 
`explicit` in the config?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -457,7 +457,7 @@ public synchronized <K, V> GlobalKTable<K, V> 
globalTable(final String topic,
         Objects.requireNonNull(materialized, "materialized can't be null");
         final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
         // always use the serdes from consumed
-        
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
+        
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()).withLoggingDisabled();

Review Comment:
   Same as above



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2333,4 +2341,33 @@ public <KIn, VIn, KOut, VOut> 
WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wra
             processorWrapper.wrapProcessorSupplier(name, processorSupplier)
         );
     }
+
+    public void addUnprovidedInternalTopics(final InternalResourcesNaming 
internalResourcesNaming) {
+        unprovidedInternalNames.add(internalResourcesNaming);
+    }
+
+    public void checkUnprovidedNames() {
+        if (!unprovidedInternalNames.isEmpty()) {
+            final StringBuilder result = new StringBuilder();
+            for (final InternalResourcesNaming internalResourcesNaming : 
unprovidedInternalNames) {
+                if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) {
+                    result.append(String.format("Following changelog topic has 
not been named: %s%n", internalResourcesNaming.changelogTopic()));

Review Comment:
   It could be nicer to collect the resource names by type and output one line 
per type? As in:
   
   ```
   Following changelog topic has not been named: a, b, c
   ```
   
   This is just a suggestion



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -192,6 +196,10 @@ public InternalTopologyBuilder(final TopologyConfig 
topologyConfigs) {
 
     private boolean hasPersistentStores = false;
 
+    private boolean ensureExplicitInternalResourceNaming;

Review Comment:
   Could be final



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+public final class InternalResourcesNaming {
+
+    private String repartitionTopic;
+    private String changelogTopic;
+    private String stateStore;
+
+    private InternalResourcesNaming() {
+    }
+
+    public static InternalResourcesNaming build() {

Review Comment:
   This class at first looks like a builder, but it's something different, 
since you build first and change values afterwards. I would add a proper 
builder, or just use a regular class with a public constructor and setters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to