alievmirza commented on code in PR #4675:
URL: https://github.com/apache/ignite-3/pull/4675#discussion_r1834632439


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -375,6 +379,43 @@ private CompletableFuture<Void> 
onUpdateScaleUpBusy(AlterZoneEventParameters par
         return nullCompletedFuture();
     }
 
+    private CompletableFuture<Void> onUpdatePartitionDistributionResetBusy(int 
partitionDistributionReset, long causalityToken) {
+        // It is safe to zoneState.entrySet in term of ConcurrentModification 
and etc. because meta storage notifications are one-threaded
+        // and this map will be initialized on a manager start or with catalog 
notification or with distribution configuration changes.
+        for (Map.Entry<Integer, ZoneState> zoneStateEntry : 
zonesState.entrySet()) {
+            int zoneId = zoneStateEntry.getKey();
+
+            if (partitionDistributionReset == IMMEDIATE_TIMER_VALUE) {
+                // TODO: IGNITE-23599 Implement valid behaviour here.
+                return nullCompletedFuture();
+            }
+
+            ZoneState zoneState = zoneStateEntry.getValue();
+
+            if (partitionDistributionReset != INFINITE_TIMER_VALUE) {
+                Optional<Long> highestRevision = 
zoneState.highestRevision(true);
+
+                assert highestRevision.isEmpty() || causalityToken >= 
highestRevision.get() : IgniteStringFormatter.format(
+                        "Expected causalityToken that is greater or equal to 
already seen meta storage events: highestRevision={}, "
+                                + "causalityToken={}",
+                        highestRevision.orElse(null), causalityToken
+                );
+
+

Review Comment:
   redundant line



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/configuration/DistributionZonesHighAvailabilityConfigurationTest.java:
##########
@@ -50,22 +56,44 @@ void testValidSystemPropertiesOnStart(
                     + PARTITION_DISTRIBUTION_RESET_TIMEOUT + ".propertyValue = 
\"5\"}")
             SystemDistributedConfiguration systemConfig
     ) {
-        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig);
+        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
         config.startAndInit();
 
         assertEquals(5, config.partitionDistributionResetTimeout());
     }
 
     @Test
     void testValidSystemPropertiesOnChange(@InjectConfiguration 
SystemDistributedConfiguration systemConfig) {
-        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig);
+        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
         config.startAndInit();
 
         changeSystemConfig(systemConfig, "10");
 
         assertEquals(10, config.partitionDistributionResetTimeout());
     }
 
+    @Test
+    void testUpdateConfigListener(@InjectConfiguration 
SystemDistributedConfiguration systemConfig) throws InterruptedException {
+        AtomicReference<Integer> partitionDistributionResetTimeoutValue = new 
AtomicReference<>();
+        AtomicReference<Long> revisionValue = new AtomicReference<>();
+
+        var config = new DistributionZonesHighAvailabilityConfiguration(
+                systemConfig,
+                (partitionDistributionResetTimeout, revision) -> {
+                    
partitionDistributionResetTimeoutValue.set(partitionDistributionResetTimeout);
+                    revisionValue.set(revision);
+                }
+        );
+        config.startAndInit();
+
+        changeSystemConfig(systemConfig, "10");
+
+        assertTrue(waitForCondition(() ->
+                partitionDistributionResetTimeoutValue.get() != null
+                        && partitionDistributionResetTimeoutValue.get() == 10, 
1_000));

Review Comment:
   I would expect `partitionDistributionResetTimeoutValue.get() != 10` before 
`changeSystemConfig`



##########
modules/configuration-system/src/main/java/org/apache/ignite/internal/configuration/validation/NonNegativeIntegerNumberSystemPropertyValueValidator.java:
##########
@@ -26,7 +26,8 @@
 import org.apache.ignite.internal.configuration.SystemPropertyView;
 
 /** Validator for system property values that are expected to be non-negative 
{@code long} number. */

Review Comment:
   not long, but int



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/configuration/DistributionZonesHighAvailabilityConfiguration.java:
##########
@@ -61,32 +68,33 @@ public void start() {
     void startAndInit() {
         start();
 
-        updateSystemProperties(systemDistributedConfig.value());
+        updateSystemProperties(systemDistributedConfig.value(), 1);
     }
 
     /** Returns partition group reset timeout after a partition group majority 
loss. */

Review Comment:
   The javadoc sounds a bit odd to me. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1174,12 +1235,18 @@ public static class ZoneState {
         /** Schedule task for a scale down process. */
         private ScheduledFuture<?> scaleDownTask;
 
+        /** Schedule task for a partition distribution reset process. */
+        private ScheduledFuture<?> partitionDistributionResetTask;

Review Comment:
   don't we need to stop this in 
`org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState#stopTimers`?



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/configuration/DistributionZonesHighAvailabilityConfigurationTest.java:
##########
@@ -50,22 +56,44 @@ void testValidSystemPropertiesOnStart(
                     + PARTITION_DISTRIBUTION_RESET_TIMEOUT + ".propertyValue = 
\"5\"}")
             SystemDistributedConfiguration systemConfig
     ) {
-        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig);
+        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
         config.startAndInit();
 
         assertEquals(5, config.partitionDistributionResetTimeout());
     }
 
     @Test
     void testValidSystemPropertiesOnChange(@InjectConfiguration 
SystemDistributedConfiguration systemConfig) {
-        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig);
+        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
         config.startAndInit();
 
         changeSystemConfig(systemConfig, "10");
 
         assertEquals(10, config.partitionDistributionResetTimeout());
     }
 
+    @Test
+    void testUpdateConfigListener(@InjectConfiguration 
SystemDistributedConfiguration systemConfig) throws InterruptedException {
+        AtomicReference<Integer> partitionDistributionResetTimeoutValue = new 
AtomicReference<>();
+        AtomicReference<Long> revisionValue = new AtomicReference<>();
+
+        var config = new DistributionZonesHighAvailabilityConfiguration(
+                systemConfig,
+                (partitionDistributionResetTimeout, revision) -> {
+                    
partitionDistributionResetTimeoutValue.set(partitionDistributionResetTimeout);
+                    revisionValue.set(revision);
+                }
+        );
+        config.startAndInit();
+
+        changeSystemConfig(systemConfig, "10");
+
+        assertTrue(waitForCondition(() ->
+                partitionDistributionResetTimeoutValue.get() != null
+                        && partitionDistributionResetTimeoutValue.get() == 10, 
1_000));
+        assertEquals(1, revisionValue.get());
+    }
+
     private static void changeSystemConfig(
             SystemDistributedConfiguration systemConfig,
             String partitionDistributionResetScaleDown

Review Comment:
   partitionDistributionResetScaleDown? 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1255,6 +1322,22 @@ public synchronized void rescheduleScaleDown(long delay, 
Runnable runnable, int
             scaleDownTaskDelay = delay;
         }
 
+        /**
+         * Reschedules existing partition distribution reset task, if it is 
not started yet and the delay of this task is not immediate,
+         * or schedules new one, if the current task cannot be canceled.
+         *
+         * @param delay Delay to start runnable in seconds.
+         * @param runnable Custom logic to run.
+         * @param zoneId Unique id of a zone to determine the executor of the 
task.
+         */
+        public synchronized void reschedulePartitionDistributionReset(long 
delay, Runnable runnable, int zoneId) {

Review Comment:
   I expect to see similar tests for this method as we have in 
`org.apache.ignite.internal.distributionzones.DistributionZonesSchedulersTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to