zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r415334757



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,45 @@ public final void copartitionSources(final 
Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
     }
 
+    public void validateCoPartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> 
nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> coPartition : copartitionGroups) {
+            final Map<String, InternalTopicProperties> coPartitionProperties = 
new HashMap<>();
+            internalTopicNamesWithProperties.forEach((topic, prop) -> {
+                if (coPartition.contains(topic) && 
prop.getNumberOfPartitions().isPresent()) {
+                    coPartitionProperties.put(topic, prop);
+                }
+            });
+            if (coPartition.size() == coPartitionProperties.size()) {

Review comment:
       It's my pleasure.
   It means that not all input topics have correspond internal topic if 
coPartition.size() != coPartitionProperties.size(), if not equal is true, we 
can just skip this validation. You can see the original validation in 
CopartitionedTopicsEnforcer#enforce
   ```
   if (copartitionGroup.equals(repartitionTopicConfigs.keySet())) {
       ...
       validateAndGetNumOfPartitions
       ...
   }
   ```
   If some of input topics don't have repartition operation, their internal 
topic partition number can be deducted by others which have repartition 
operation. You can see 
KStreamRepartitionIntegrationTest#shouldDeductNumberOfPartitionsFromRepartitionOperation
 for more details.
   
    So we can skip this validation if coPartition.size() != 
coPartitionProperties.size()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to