zstan commented on code in PR #4629: URL: https://github.com/apache/ignite-3/pull/4629#discussion_r1828879749
########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java: ########## @@ -145,203 +143,212 @@ private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan multiStepPlan } } - long topVer = topologyAware ? topology.version() : Long.MAX_VALUE; + long topVer = topologyAware ? logicalTopologyVerSupplier.get() : Long.MAX_VALUE; - return new MappingsCacheValue(topVer, tableIds, mapFragments(context, template, key.mapOnBackups)); + return new MappingsCacheValue(topVer, tableIds, mapFragments(template, mapOnBackups)); } - if (val.topVer < topology.version()) { - return new MappingsCacheValue(topology.version(), val.tableIds, mapFragments(context, template, key.mapOnBackups)); + long topologyVer = logicalTopologyVerSupplier.get(); + + if (val.topologyVersion < topologyVer) { + return new MappingsCacheValue(topologyVer, val.tableIds, mapFragments(template, mapOnBackups)); } return val; - } - ); + }); - return cacheValue.mappedFragments.thenApply(mappedFragments -> applyPartitionPruning(mappedFragments.fragments, parameters)); + return cacheValue.mappedFragments.thenApply(frags -> applyPartitionPruning(frags.fragments, parameters)); } - private CompletableFuture<MappedFragments> mapFragments( - MappingContext context, - FragmentsTemplate template, + CompletableFuture<DistributionHolder> composeDistributions( + List<IgniteSystemView> views, + Set<IgniteTable> tables, boolean mapOnBackups ) { - IdGenerator idGenerator = new IdGenerator(template.nextId); - List<Fragment> fragments = new ArrayList<>(template.fragments); - HybridTimestamp mappingTime = clock.now(); - - List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets = - fragments.stream().flatMap(fragment -> Stream.concat( - fragment.tables().values().stream() - .map(table -> targetProvider.forTable(mappingTime, context.targetFactory(), table, mapOnBackups) - .thenApply(target -> IntObjectPair.of(table.id(), target)) - ), - fragment.systemViews().stream() - .map(view -> targetProvider.forSystemView(context.targetFactory(), view) - .thenApply(target -> IntObjectPair.of(view.id(), target)) - ) - )) - .collect(Collectors.toList()); + if (tables.isEmpty() && views.isEmpty()) { + DistributionHolder holder = new DistributionHolder(List.of(localNodeName), Map.of(), Map.of()); - return allOf(targets.toArray(new CompletableFuture[0])) - .thenApply(ignored -> { - Int2ObjectMap<ExecutionTarget> targetsById = new Int2ObjectOpenHashMap<>(); + return completedFuture(holder); + } else { + Map<IgniteTable, CompletableFuture<List<TokenizedAssignments>>> tablesAssignments = newHashMap(tables.size()); + Map<IgniteTable, List<TokenizedAssignments>> tablesAssignmentsResolved = newHashMap(tables.size()); - for (CompletableFuture<IntObjectPair<ExecutionTarget>> fut : targets) { - // this is a safe join, because we have waited for all futures to be complete - IntObjectPair<ExecutionTarget> pair = fut.join(); + for (IgniteTable tbl : tables) { + CompletableFuture<List<TokenizedAssignments>> assignments = distributionProvider + .forTable(clock.now(), tbl, mapOnBackups); - targetsById.put(pair.firstInt(), pair.second()); - } + tablesAssignments.put(tbl, assignments); + } - FragmentMapper mapper = new FragmentMapper(template.cluster.getMetadataQuery(), context, targetsById); + CompletableFuture<Void> all = CompletableFuture.allOf(tablesAssignments.values().toArray(new CompletableFuture[0])); - List<FragmentMapping> mappings = mapper.map(fragments, idGenerator); + CompletableFuture<Map<IgniteTable, List<TokenizedAssignments>>> fut = all.thenApply(v -> tablesAssignments.entrySet().stream() + .map(e -> Map.entry(e.getKey(), e.getValue().join())) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)) + ); - Long2ObjectMap<ColocationGroup> groupsBySourceId = new Long2ObjectOpenHashMap<>(); - Long2ObjectMap<List<String>> allSourcesByExchangeId = new Long2ObjectOpenHashMap<>(); + CompletableFuture<List<String>> participantNodes = fut.thenApply( + v -> v.values().stream().flatMap(List::stream).flatMap(i -> i.nodes().stream()).map(Assignment::consistentId) + .distinct() + .collect(Collectors.toList())); - for (FragmentMapping mapping : mappings) { - Fragment fragment = mapping.fragment(); + return participantNodes.thenApply(nodes -> { + nodes.add(localNodeName); - for (ColocationGroup group : mapping.groups()) { - for (long sourceId : group.sourceIds()) { - groupsBySourceId.put(sourceId, group); - } - } + // this is a safe join, because we have waited for all futures to be completed + tablesAssignments.forEach((k, v) -> tablesAssignmentsResolved.put(k, v.join())); - if (!fragment.rootFragment()) { - IgniteSender sender = (IgniteSender) fragment.root(); + Map<String, List<String>> nodesPerView = views.stream().distinct() + .collect(Collectors.toMap(IgniteDataSource::name, distributionProvider::forSystemView)); - List<String> nodeNames = mapping.groups().stream() - .flatMap(g -> g.nodeNames().stream()) - .distinct().collect(Collectors.toList()); + List<String> viewNodes = nodesPerView.values().stream().flatMap(List::stream).collect(Collectors.toList()); - allSourcesByExchangeId.put(sender.exchangeId(), nodeNames); - } - } + List<String> nodes0 = Stream.concat(viewNodes.stream(), nodes.stream()).distinct().collect(Collectors.toUnmodifiableList()); - List<MappedFragment> mappedFragmentsList = new ArrayList<>(mappings.size()); - Set<String> targetNodes = new HashSet<>(); - for (FragmentMapping mapping : mappings) { - Fragment fragment = mapping.fragment(); - - ColocationGroup targetGroup = null; - if (!fragment.rootFragment()) { - IgniteSender sender = (IgniteSender) fragment.root(); + return new DistributionHolder(nodes0, tablesAssignmentsResolved, nodesPerView); + }); + } + } - targetGroup = groupsBySourceId.get(sender.exchangeId()); - } + private CompletableFuture<MappedFragments> mapFragments( + FragmentsTemplate template, + boolean mapOnBackups + ) { + List<IgniteSystemView> views = template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream()) Review Comment: done ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java: ########## @@ -124,13 +121,14 @@ public CompletableFuture<Boolean> onPrimaryReplicaExpired(PrimaryReplicaEventPar return CompletableFutures.falseCompletedFuture(); } - private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan multiStepPlan, MappingParameters parameters) { - TopologySnapshot topology = topologyHolder.topology(); - MappingContext context = new MappingContext(localNodeName, topology.nodes()); + @Override + public CompletableFuture<List<MappedFragment>> map(MultiStepPlan multiStepPlan, MappingParameters parameters) { + RelOptCluster cluster = Commons.cluster(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org