zstan commented on code in PR #4629: URL: https://github.com/apache/ignite-3/pull/4629#discussion_r1830413143
########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionDistributionProviderImpl.java: ########## @@ -44,61 +47,83 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; -import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory; -import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider; -import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; -import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; import org.apache.ignite.internal.systemview.api.SystemViewManager; -import org.apache.ignite.lang.ErrorGroups.Sql; -import org.apache.ignite.sql.SqlException; - -/** - * Implementation of {@link ExecutionTargetProvider} which takes assignments from {@link PlacementDriver} and {@link SystemViewManager}. - */ -public class ExecutionTargetProviderImpl implements ExecutionTargetProvider { - private static final IgniteLogger LOG = Loggers.forClass(ExecutionTargetProviderImpl.class); +/** Execution nodes information provider. */ +public class ExecutionDistributionProviderImpl implements ExecutionDistributionProvider { + private static final IgniteLogger LOG = Loggers.forClass(ExecutionDistributionProviderImpl.class); private final PlacementDriver placementDriver; private final SystemViewManager systemViewManager; - ExecutionTargetProviderImpl( - PlacementDriver placementDriver, SystemViewManager systemViewManager - ) { + ExecutionDistributionProviderImpl(PlacementDriver placementDriver, SystemViewManager systemViewManager) { this.placementDriver = placementDriver; this.systemViewManager = systemViewManager; } @Override - public CompletableFuture<ExecutionTarget> forTable( + public CompletableFuture<DistributionHolder> distribution( HybridTimestamp operationTime, - ExecutionTargetFactory factory, - IgniteTable table, - boolean includeBackups + boolean mapOnBackups, + Collection<IgniteTable> tables, + Collection<String> views, + String initiatorNode ) { - return collectAssignments(table, operationTime, includeBackups) - .thenApply(factory::partitioned); + if (tables.isEmpty() && views.isEmpty()) { + DistributionHolder holder = new DistributionHolderImpl(List.of(initiatorNode), Map.of(), Map.of()); + + return completedFuture(holder); + } else { + return distribution0(tables, views, operationTime, mapOnBackups, initiatorNode); + } } - @Override - public CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) { - List<String> nodes = systemViewManager.owningNodes(view.name()); - - if (nullOrEmpty(nodes)) { - return failedFuture( - new SqlException(Sql.MAPPING_ERR, format("The view with name '{}' could not be found on" - + " any active nodes in the cluster", view.name())) - ); + private CompletableFuture<DistributionHolder> distribution0( + Collection<IgniteTable> tables, + Collection<String> views, + HybridTimestamp operationTime, + boolean mapOnBackups, + String initiatorNode + ) { + Map<IgniteTable, CompletableFuture<List<TokenizedAssignments>>> mapResult = newHashMap(tables.size()); + Map<IgniteTable, List<TokenizedAssignments>> mapResultResolved = newHashMap(tables.size()); + + for (IgniteTable tbl : tables) { + CompletableFuture<List<TokenizedAssignments>> assignments = collectAssignments(tbl, operationTime, mapOnBackups); + + mapResult.put(tbl, assignments); } - return completedFuture( - view.distribution() == IgniteDistributions.single() - ? factory.oneOf(nodes) - : factory.allOf(nodes) + CompletableFuture<Void> all = CompletableFuture.allOf(mapResult.values().toArray(new CompletableFuture[0])); + + CompletableFuture<Map<IgniteTable, List<TokenizedAssignments>>> fut = all.thenApply(v -> mapResult.entrySet().stream() + .map(e -> Map.entry(e.getKey(), e.getValue().join())) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)) ); + + CompletableFuture<List<String>> participantNodes = fut.thenApply( + v -> v.values().stream().flatMap(List::stream).flatMap(i -> i.nodes().stream()).map(Assignment::consistentId) + .distinct() + .collect(Collectors.toList())); + + return participantNodes.thenApply(nodes -> { + nodes.add(initiatorNode); + + // this is a safe join, because we have waited for all futures to be completed + mapResult.forEach((k, v) -> mapResultResolved.put(k, v.join())); + + Map<String, List<String>> nodesPerView = views.stream().distinct() Review Comment: this code was reworked earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org