g1geordie commented on a change in pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#discussion_r581597952



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
##########
@@ -57,40 +59,28 @@ public String name() {
     public NamedInternal withName(final String name) {
         return new NamedInternal(name);
     }
-    
+
     String suffixWithOrElseGet(final String suffix, final String other) {
-        if (name != null) {
-            return name + suffix;
-        } else {
-            return other;
-        }
+        return Optional.ofNullable(name)
+                // Re-validate generated name as suffixed string could be too 
large.
+                .map(x -> withName(name + suffix).name)
+                .orElse(other);
     }
 
     String suffixWithOrElseGet(final String suffix, final InternalNameProvider 
provider, final String prefix) {
         // We actually do not need to generate processor names for operation 
if a name is specified.
         // But before returning, we still need to burn index for the operation 
to keep topology backward compatibility.
-        if (name != null) {
-            provider.newProcessorName(prefix);
+        final String defaultName = provider.newProcessorName(prefix);
 
-            final String suffixed = name + suffix;
-            // Re-validate generated name as suffixed string could be too 
large.
-            Named.validate(suffixed);
-
-            return suffixed;
-        } else {
-            return provider.newProcessorName(prefix);
-        }
+        return suffixWithOrElseGet(suffix, defaultName);
     }
 
     String orElseGenerateWithPrefix(final InternalNameProvider provider, final 
String prefix) {
         // We actually do not need to generate processor names for operation 
if a name is specified.
         // But before returning, we still need to burn index for the operation 
to keep topology backward compatibility.

Review comment:
       May I ask a question?
    
   Is Backward compatibility still in the current version?
   I see the hint in other test
   
   <pre><code>
   SuppressTopologyTest.shouldApplyNameToSuppressionNode {
           ...
           .suppress(untilTimeLimit(Duration.ofSeconds(1), 
unbounded()).withName("asdf"))
           ....
          // <del>without</del>(with) the name,  <b>the suppression node does 
not increment the topology index </b>
           assertThat(namedNodeTopology, is(NAMED_INTERMEDIATE_TOPOLOGY));
   }
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to