zstan commented on code in PR #4629:
URL: https://github.com/apache/ignite-3/pull/4629#discussion_r1824867877


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionDistributionProviderImpl.java:
##########
@@ -44,61 +47,83 @@
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
-import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.sql.SqlException;
-
-/**
- * Implementation of {@link ExecutionTargetProvider} which takes assignments 
from {@link PlacementDriver} and {@link SystemViewManager}.
- */
-public class ExecutionTargetProviderImpl implements ExecutionTargetProvider {
-    private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionTargetProviderImpl.class);
 
+/** Execution nodes information provider. */
+public class ExecutionDistributionProviderImpl implements 
ExecutionDistributionProvider {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionDistributionProviderImpl.class);
     private final PlacementDriver placementDriver;
     private final SystemViewManager systemViewManager;
 
-    ExecutionTargetProviderImpl(
-            PlacementDriver placementDriver, SystemViewManager 
systemViewManager
-    ) {
+    ExecutionDistributionProviderImpl(PlacementDriver placementDriver, 
SystemViewManager systemViewManager) {
         this.placementDriver = placementDriver;
         this.systemViewManager = systemViewManager;
     }
 
     @Override
-    public CompletableFuture<ExecutionTarget> forTable(
+    public CompletableFuture<DistributionHolder> distribution(
             HybridTimestamp operationTime,
-            ExecutionTargetFactory factory,
-            IgniteTable table,
-            boolean includeBackups
+            boolean mapOnBackups,
+            Collection<IgniteTable> tables,
+            Collection<String> views,
+            String initiatorNode
     ) {
-        return collectAssignments(table, operationTime, includeBackups)
-                .thenApply(factory::partitioned);
+        if (tables.isEmpty() && views.isEmpty()) {
+            DistributionHolder holder = new 
DistributionHolderImpl(List.of(initiatorNode), Map.of(), Map.of());
+
+            return completedFuture(holder);
+        } else {
+            return distribution0(tables, views, operationTime, mapOnBackups, 
initiatorNode);
+        }
     }
 
-    @Override
-    public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
-        List<String> nodes = systemViewManager.owningNodes(view.name());
-
-        if (nullOrEmpty(nodes)) {
-            return failedFuture(
-                    new SqlException(Sql.MAPPING_ERR, format("The view with 
name '{}' could not be found on"
-                            + " any active nodes in the cluster", view.name()))
-            );
+    private CompletableFuture<DistributionHolder> distribution0(
+            Collection<IgniteTable> tables,
+            Collection<String> views,
+            HybridTimestamp operationTime,
+            boolean mapOnBackups,
+            String initiatorNode
+    ) {
+        Map<IgniteTable, CompletableFuture<List<TokenizedAssignments>>> 
mapResult = newHashMap(tables.size());
+        Map<IgniteTable, List<TokenizedAssignments>> mapResultResolved = 
newHashMap(tables.size());
+
+        for (IgniteTable tbl : tables) {
+            CompletableFuture<List<TokenizedAssignments>> assignments = 
collectAssignments(tbl, operationTime, mapOnBackups);
+
+            mapResult.put(tbl, assignments);
         }
 
-        return completedFuture(
-                view.distribution() == IgniteDistributions.single()
-                        ? factory.oneOf(nodes)
-                        : factory.allOf(nodes)
+        CompletableFuture<Void> all = 
CompletableFuture.allOf(mapResult.values().toArray(new CompletableFuture[0]));
+
+        CompletableFuture<Map<IgniteTable, List<TokenizedAssignments>>> fut = 
all.thenApply(v -> mapResult.entrySet().stream()
+                .map(e -> Map.entry(e.getKey(), e.getValue().join()))
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue))
         );
+
+        CompletableFuture<List<String>> participantNodes = fut.thenApply(
+                v -> v.values().stream().flatMap(List::stream).flatMap(i -> 
i.nodes().stream()).map(Assignment::consistentId)
+                        .distinct()
+                        .collect(Collectors.toList()));
+
+        return participantNodes.thenApply(nodes -> {
+            nodes.add(initiatorNode);

Review Comment:
   it`s not true, check a bit lines near :
   `List<String> nodes0 = Stream.concat(viewNodes.stream(), 
nodes.stream()).distinct().collect(Collectors.toList());`



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionDistributionProviderImpl.java:
##########
@@ -44,61 +47,83 @@
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
-import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.sql.SqlException;
-
-/**
- * Implementation of {@link ExecutionTargetProvider} which takes assignments 
from {@link PlacementDriver} and {@link SystemViewManager}.
- */
-public class ExecutionTargetProviderImpl implements ExecutionTargetProvider {
-    private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionTargetProviderImpl.class);
 
+/** Execution nodes information provider. */
+public class ExecutionDistributionProviderImpl implements 
ExecutionDistributionProvider {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionDistributionProviderImpl.class);
     private final PlacementDriver placementDriver;
     private final SystemViewManager systemViewManager;
 
-    ExecutionTargetProviderImpl(
-            PlacementDriver placementDriver, SystemViewManager 
systemViewManager
-    ) {
+    ExecutionDistributionProviderImpl(PlacementDriver placementDriver, 
SystemViewManager systemViewManager) {
         this.placementDriver = placementDriver;
         this.systemViewManager = systemViewManager;
     }
 
     @Override
-    public CompletableFuture<ExecutionTarget> forTable(
+    public CompletableFuture<DistributionHolder> distribution(
             HybridTimestamp operationTime,
-            ExecutionTargetFactory factory,
-            IgniteTable table,
-            boolean includeBackups
+            boolean mapOnBackups,
+            Collection<IgniteTable> tables,
+            Collection<String> views,
+            String initiatorNode
     ) {
-        return collectAssignments(table, operationTime, includeBackups)
-                .thenApply(factory::partitioned);
+        if (tables.isEmpty() && views.isEmpty()) {
+            DistributionHolder holder = new 
DistributionHolderImpl(List.of(initiatorNode), Map.of(), Map.of());
+
+            return completedFuture(holder);
+        } else {
+            return distribution0(tables, views, operationTime, mapOnBackups, 
initiatorNode);
+        }
     }
 
-    @Override
-    public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
-        List<String> nodes = systemViewManager.owningNodes(view.name());
-
-        if (nullOrEmpty(nodes)) {
-            return failedFuture(
-                    new SqlException(Sql.MAPPING_ERR, format("The view with 
name '{}' could not be found on"
-                            + " any active nodes in the cluster", view.name()))
-            );
+    private CompletableFuture<DistributionHolder> distribution0(
+            Collection<IgniteTable> tables,
+            Collection<String> views,
+            HybridTimestamp operationTime,
+            boolean mapOnBackups,
+            String initiatorNode
+    ) {
+        Map<IgniteTable, CompletableFuture<List<TokenizedAssignments>>> 
mapResult = newHashMap(tables.size());
+        Map<IgniteTable, List<TokenizedAssignments>> mapResultResolved = 
newHashMap(tables.size());
+
+        for (IgniteTable tbl : tables) {
+            CompletableFuture<List<TokenizedAssignments>> assignments = 
collectAssignments(tbl, operationTime, mapOnBackups);
+
+            mapResult.put(tbl, assignments);
         }
 
-        return completedFuture(
-                view.distribution() == IgniteDistributions.single()
-                        ? factory.oneOf(nodes)
-                        : factory.allOf(nodes)
+        CompletableFuture<Void> all = 
CompletableFuture.allOf(mapResult.values().toArray(new CompletableFuture[0]));
+
+        CompletableFuture<Map<IgniteTable, List<TokenizedAssignments>>> fut = 
all.thenApply(v -> mapResult.entrySet().stream()
+                .map(e -> Map.entry(e.getKey(), e.getValue().join()))
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue))
         );
+
+        CompletableFuture<List<String>> participantNodes = fut.thenApply(
+                v -> v.values().stream().flatMap(List::stream).flatMap(i -> 
i.nodes().stream()).map(Assignment::consistentId)
+                        .distinct()
+                        .collect(Collectors.toList()));
+
+        return participantNodes.thenApply(nodes -> {
+            nodes.add(initiatorNode);

Review Comment:
   it\`s not true, check a bit lines near :
   `List<String> nodes0 = Stream.concat(viewNodes.stream(), 
nodes.stream()).distinct().collect(Collectors.toList());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to