sebastienviale commented on code in PR #18233:
URL: https://github.com/apache/kafka/pull/18233#discussion_r1965507035


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2289,4 +2298,45 @@ public <KIn, VIn, KOut, VOut> 
WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wra
             processorWrapper.wrapProcessorSupplier(name, processorSupplier)
         );
     }
+
+    public void addImplicitInternalNames(final InternalResourcesNaming 
internalResourcesNaming) {
+        implicitInternalNames.add(internalResourcesNaming);
+    }
+
+    public void checkUnprovidedNames() {
+        if (!implicitInternalNames.isEmpty()) {
+            final StringBuilder result = new StringBuilder();
+            final List<String> changelogTopics = new ArrayList<>();
+            final List<String> stateStores = new ArrayList<>();
+            final List<String> repartitionTopics = new ArrayList<>();
+            for (final InternalResourcesNaming internalResourcesNaming : 
implicitInternalNames) {
+                if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) {
+                    
changelogTopics.add(internalResourcesNaming.changelogTopic());
+                }
+                if (!Utils.isBlank(internalResourcesNaming.stateStore())) {
+                    stateStores.add(internalResourcesNaming.stateStore());
+                }
+                if 
(!Utils.isBlank(internalResourcesNaming.repartitionTopic())) {
+                    
repartitionTopics.add(internalResourcesNaming.repartitionTopic());
+                }
+            }
+            if (!changelogTopics.isEmpty()) {
+                result.append(String.format("Following changelog topic(s) has 
not been named: %s%n", String.join(", ", changelogTopics)));
+            }
+            if (!stateStores.isEmpty()) {
+                result.append(String.format("Following state store(s) has not 
been named: %s%n", String.join(", ", stateStores)));
+            }
+            if (!repartitionTopics.isEmpty()) {
+                result.append(String.format("Following repartition topic(s) 
has not been named: %s%n", String.join(", ", repartitionTopics)));
+            }
+            if (ensureExplicitInternalResourceNaming) {
+                throw new TopologyException(result.toString());
+            } else {
+                log.warn("Enforce explicit naming for all internal resources 
is set to false. If you want" +

Review Comment:
   @lucasbru @ableegoldman 
   What do you think about that: 
   
   Explicit naming for internal resources is currently disabled. If you want to 
enforce user-defined names for all internal resources, set `" 
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG"` to `true`.  
   Note: If your application has already been deployed with auto-generated 
names, changing internal resource names may impact compatibility.  
   If you need to apply this configuration to an existing application, consider 
resetting the Streams application using the Kafka Streams reset tool."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to