denis-chudov commented on code in PR #5092: URL: https://github.com/apache/ignite-3/pull/5092#discussion_r1960130828
########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java: ########## @@ -829,86 +548,61 @@ private static String entryKeyAsString(EntryEvent entry) { /** * Reaction on an update of logical topology. In this method {@link DistributionZoneManager#logicalTopology}, - * {@link DistributionZoneManager#nodesAttributes}, {@link ZoneState#topologyAugmentationMap} are updated. + * {@link DistributionZoneManager#nodesAttributes} are updated. * This fields are saved to Meta Storage, also timers are scheduled. * Note that all futures of Meta Storage updates that happen in this method are returned from this method. * * @param newLogicalTopology New logical topology. - * @param revision Revision of the logical topology update. - * @param catalogVersion Actual version of the Catalog. + * @param oldLogicalTopology Old logical topology. + * @param revision Revision of the event. + * @param timestamp Event timestamp. * @return Future reflecting the completion of the actions needed when logical topology was updated. */ - private CompletableFuture<Void> onLogicalTopologyUpdate(Set<NodeWithAttributes> newLogicalTopology, long revision, int catalogVersion) { - Set<NodeWithAttributes> currentLogicalTopology = logicalTopology(revision); - - Set<Node> removedNodes = - currentLogicalTopology.stream() - .filter(node -> !newLogicalTopology.contains(node)) - .map(NodeWithAttributes::node) - .collect(toSet()); - - Set<Node> addedNodes = - newLogicalTopology.stream() - .filter(node -> !currentLogicalTopology.contains(node)) - .map(NodeWithAttributes::node) - .collect(toSet()); - - Set<Integer> zoneIds = new HashSet<>(); + private CompletableFuture<Void> onLogicalTopologyUpdate( + Set<NodeWithAttributes> newLogicalTopology, + Set<NodeWithAttributes> oldLogicalTopology, + long revision, + HybridTimestamp timestamp + ) { + logicalTopologyByRevision.put(revision, newLogicalTopology); List<CompletableFuture<Void>> futures = new ArrayList<>(); - logicalTopologyByRevision.put(revision, newLogicalTopology); + int catalogVersion = catalogManager.activeCatalogVersion(timestamp.longValue()); for (CatalogZoneDescriptor zone : catalogManager.catalog(catalogVersion).zones()) { - int zoneId = zone.id(); - - updateLocalTopologyAugmentationMap(addedNodes, removedNodes, revision, zoneId); - - futures.add(scheduleTimers(zone, !addedNodes.isEmpty(), !removedNodes.isEmpty(), revision)); + CompletableFuture<Void> f = dataNodesManager.onTopologyChange( + zone, + revision, + timestamp, + newLogicalTopology, + oldLogicalTopology + ); - zoneIds.add(zone.id()); + futures.add(f); } newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), n)); - futures.add(saveRecoverableStateToMetastorage(zoneIds, revision, newLogicalTopology)); + futures.add(saveRecoverableStateToMetastorage(revision, newLogicalTopology)); return allOf(futures.toArray(CompletableFuture[]::new)); } - /** - * Update local topology augmentation map with newly added and removed nodes. - * - * @param addedNodes Nodes that was added to a topology and should be added to zones data nodes. - * @param removedNodes Nodes that was removed from a topology and should be removed from zones data nodes. - * @param revision Revision of the event that triggered this method. - * @param zoneId Zone's id. - */ - private void updateLocalTopologyAugmentationMap(Set<Node> addedNodes, Set<Node> removedNodes, long revision, int zoneId) { - if (!addedNodes.isEmpty()) { - zonesState.get(zoneId).nodesToAddToDataNodes(addedNodes, revision); - } - - if (!removedNodes.isEmpty()) { - zonesState.get(zoneId).nodesToRemoveFromDataNodes(removedNodes, revision); - } - } - /** * Saves recoverable state of the Distribution Zone Manager to Meta Storage atomically in one batch. * After restart it could be used to restore these fields. * - * @param zoneIds Set of zone id's, whose states will be saved in the Meta Storage. * @param revision Revision of the event. * @param newLogicalTopology New logical topology. * @return Future representing pending completion of the operation. */ private CompletableFuture<Void> saveRecoverableStateToMetastorage( - Set<Integer> zoneIds, long revision, Set<NodeWithAttributes> newLogicalTopology ) { - Operation[] puts = new Operation[3 + zoneIds.size()]; + // TODO + Operation[] puts = new Operation[3]; Review Comment: yes, this would be better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org