zstan commented on code in PR #4629:
URL: https://github.com/apache/ignite-3/pull/4629#discussion_r1828879749


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java:
##########
@@ -145,203 +143,212 @@ private CompletableFuture<List<MappedFragment>> 
map0(MultiStepPlan multiStepPlan
                             }
                         }
 
-                        long topVer = topologyAware ? topology.version() : 
Long.MAX_VALUE;
+                        long topVer = topologyAware ? 
logicalTopologyVerSupplier.get() : Long.MAX_VALUE;
 
-                        return new MappingsCacheValue(topVer, tableIds, 
mapFragments(context, template, key.mapOnBackups));
+                        return new MappingsCacheValue(topVer, tableIds, 
mapFragments(template, mapOnBackups));
                     }
 
-                    if (val.topVer < topology.version()) {
-                        return new MappingsCacheValue(topology.version(), 
val.tableIds, mapFragments(context, template, key.mapOnBackups));
+                    long topologyVer = logicalTopologyVerSupplier.get();
+
+                    if (val.topologyVersion < topologyVer) {
+                        return new MappingsCacheValue(topologyVer, 
val.tableIds, mapFragments(template, mapOnBackups));
                     }
 
                     return val;
-                }
-        );
+                });
 
-        return cacheValue.mappedFragments.thenApply(mappedFragments -> 
applyPartitionPruning(mappedFragments.fragments, parameters));
+        return cacheValue.mappedFragments.thenApply(frags -> 
applyPartitionPruning(frags.fragments, parameters));
     }
 
-    private CompletableFuture<MappedFragments> mapFragments(
-            MappingContext context,
-            FragmentsTemplate template,
+    CompletableFuture<DistributionHolder> composeDistributions(
+            List<IgniteSystemView> views,
+            Set<IgniteTable> tables,
             boolean mapOnBackups
     ) {
-        IdGenerator idGenerator = new IdGenerator(template.nextId);
-        List<Fragment> fragments = new ArrayList<>(template.fragments);
-        HybridTimestamp mappingTime = clock.now();
-
-        List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets =
-                fragments.stream().flatMap(fragment -> Stream.concat(
-                        fragment.tables().values().stream()
-                                .map(table -> 
targetProvider.forTable(mappingTime, context.targetFactory(), table, 
mapOnBackups)
-                                        .thenApply(target -> 
IntObjectPair.of(table.id(), target))
-                                ),
-                        fragment.systemViews().stream()
-                                .map(view -> 
targetProvider.forSystemView(context.targetFactory(), view)
-                                        .thenApply(target -> 
IntObjectPair.of(view.id(), target))
-                                )
-                ))
-                .collect(Collectors.toList());
+        if (tables.isEmpty() && views.isEmpty()) {
+            DistributionHolder holder = new 
DistributionHolder(List.of(localNodeName), Map.of(), Map.of());
 
-        return allOf(targets.toArray(new CompletableFuture[0]))
-                .thenApply(ignored -> {
-                    Int2ObjectMap<ExecutionTarget> targetsById = new 
Int2ObjectOpenHashMap<>();
+            return completedFuture(holder);
+        } else {
+            Map<IgniteTable, CompletableFuture<List<TokenizedAssignments>>> 
tablesAssignments = newHashMap(tables.size());
+            Map<IgniteTable, List<TokenizedAssignments>> 
tablesAssignmentsResolved = newHashMap(tables.size());
 
-                    for (CompletableFuture<IntObjectPair<ExecutionTarget>> fut 
: targets) {
-                        // this is a safe join, because we have waited for all 
futures to be complete
-                        IntObjectPair<ExecutionTarget> pair = fut.join();
+            for (IgniteTable tbl : tables) {
+                CompletableFuture<List<TokenizedAssignments>> assignments = 
distributionProvider
+                        .forTable(clock.now(), tbl, mapOnBackups);
 
-                        targetsById.put(pair.firstInt(), pair.second());
-                    }
+                tablesAssignments.put(tbl, assignments);
+            }
 
-                    FragmentMapper mapper = new 
FragmentMapper(template.cluster.getMetadataQuery(), context, targetsById);
+            CompletableFuture<Void> all = 
CompletableFuture.allOf(tablesAssignments.values().toArray(new 
CompletableFuture[0]));
 
-                    List<FragmentMapping> mappings = mapper.map(fragments, 
idGenerator);
+            CompletableFuture<Map<IgniteTable, List<TokenizedAssignments>>> 
fut = all.thenApply(v -> tablesAssignments.entrySet().stream()
+                    .map(e -> Map.entry(e.getKey(), e.getValue().join()))
+                    .collect(Collectors.toMap(Entry::getKey, Entry::getValue))
+            );
 
-                    Long2ObjectMap<ColocationGroup> groupsBySourceId = new 
Long2ObjectOpenHashMap<>();
-                    Long2ObjectMap<List<String>> allSourcesByExchangeId = new 
Long2ObjectOpenHashMap<>();
+            CompletableFuture<List<String>> participantNodes = fut.thenApply(
+                    v -> v.values().stream().flatMap(List::stream).flatMap(i 
-> i.nodes().stream()).map(Assignment::consistentId)
+                            .distinct()
+                            .collect(Collectors.toList()));
 
-                    for (FragmentMapping mapping : mappings) {
-                        Fragment fragment = mapping.fragment();
+            return participantNodes.thenApply(nodes -> {
+                nodes.add(localNodeName);
 
-                        for (ColocationGroup group : mapping.groups()) {
-                            for (long sourceId : group.sourceIds()) {
-                                groupsBySourceId.put(sourceId, group);
-                            }
-                        }
+                // this is a safe join, because we have waited for all futures 
to be completed
+                tablesAssignments.forEach((k, v) -> 
tablesAssignmentsResolved.put(k, v.join()));
 
-                        if (!fragment.rootFragment()) {
-                            IgniteSender sender = (IgniteSender) 
fragment.root();
+                Map<String, List<String>> nodesPerView = 
views.stream().distinct()
+                        .collect(Collectors.toMap(IgniteDataSource::name, 
distributionProvider::forSystemView));
 
-                            List<String> nodeNames = mapping.groups().stream()
-                                    .flatMap(g -> g.nodeNames().stream())
-                                    .distinct().collect(Collectors.toList());
+                List<String> viewNodes = 
nodesPerView.values().stream().flatMap(List::stream).collect(Collectors.toList());
 
-                            allSourcesByExchangeId.put(sender.exchangeId(), 
nodeNames);
-                        }
-                    }
+                List<String> nodes0 = Stream.concat(viewNodes.stream(), 
nodes.stream()).distinct().collect(Collectors.toUnmodifiableList());
 
-                    List<MappedFragment> mappedFragmentsList = new 
ArrayList<>(mappings.size());
-                    Set<String> targetNodes = new HashSet<>();
-                    for (FragmentMapping mapping : mappings) {
-                        Fragment fragment = mapping.fragment();
-
-                        ColocationGroup targetGroup = null;
-                        if (!fragment.rootFragment()) {
-                            IgniteSender sender = (IgniteSender) 
fragment.root();
+                return new DistributionHolder(nodes0, 
tablesAssignmentsResolved, nodesPerView);
+            });
+        }
+    }
 
-                            targetGroup = 
groupsBySourceId.get(sender.exchangeId());
-                        }
+    private CompletableFuture<MappedFragments> mapFragments(
+            FragmentsTemplate template,
+            boolean mapOnBackups
+    ) {
+        List<IgniteSystemView> views = 
template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream())

Review Comment:
   done



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java:
##########
@@ -124,13 +121,14 @@ public CompletableFuture<Boolean> 
onPrimaryReplicaExpired(PrimaryReplicaEventPar
         return CompletableFutures.falseCompletedFuture();
     }
 
-    private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan 
multiStepPlan, MappingParameters parameters) {
-        TopologySnapshot topology = topologyHolder.topology();
-        MappingContext context = new MappingContext(localNodeName, 
topology.nodes());
+    @Override
+    public CompletableFuture<List<MappedFragment>> map(MultiStepPlan 
multiStepPlan, MappingParameters parameters) {
+        RelOptCluster cluster = Commons.cluster();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to