sanpwc commented on code in PR #4675:
URL: https://github.com/apache/ignite-3/pull/4675#discussion_r1836685487


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -375,6 +379,42 @@ private CompletableFuture<Void> 
onUpdateScaleUpBusy(AlterZoneEventParameters par
         return nullCompletedFuture();
     }
 
+    private CompletableFuture<Void> onUpdatePartitionDistributionResetBusy(int 
partitionDistributionReset, long causalityToken) {
+        // It is safe to zoneState.entrySet in term of ConcurrentModification 
and etc. because meta storage notifications are one-threaded
+        // and this map will be initialized on a manager start or with catalog 
notification or with distribution configuration changes.
+        for (Map.Entry<Integer, ZoneState> zoneStateEntry : 
zonesState.entrySet()) {
+            int zoneId = zoneStateEntry.getKey();

Review Comment:
   Where do we check that the zone is in HA mode?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1255,6 +1321,22 @@ public synchronized void rescheduleScaleDown(long delay, 
Runnable runnable, int
             scaleDownTaskDelay = delay;
         }
 
+        /**
+         * Reschedules existing partition distribution reset task, if it is 
not started yet and the delay of this task is not immediate,
+         * or schedules new one, if the current task cannot be canceled.
+         *
+         * @param delay Delay to start runnable in seconds.
+         * @param runnable Custom logic to run.
+         * @param zoneId Unique id of a zone to determine the executor of the 
task.
+         */
+        public synchronized void reschedulePartitionDistributionReset(long 
delay, Runnable runnable, int zoneId) {
+            stopPartitionDistributionReset();
+
+            partitionDistributionResetTask = executor.schedule(runnable, 
delay, SECONDS, zoneId);

Review Comment:
   It worth adding Seconds postfix wherever needed to 
partitionDistributionResetTimeout/Delay. E.g. in 
DistributionZonesHighAvailabilityConfiguration



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/configuration/DistributionZonesHighAvailabilityConfiguration.java:
##########
@@ -61,32 +68,33 @@ public void start() {
     void startAndInit() {
         start();
 
-        updateSystemProperties(systemDistributedConfig.value());
+        updateSystemProperties(systemDistributedConfig.value(), 1);
     }
 
     /** Returns partition group reset timeout after a partition group majority 
loss. */

Review Comment:
   I believe that Mirza means that it sounds like  "return ... after ... group 
majority loss". Are we going to return the value only after the fact that 
majority loss? ;)



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/configuration/DistributionZonesHighAvailabilityConfigurationTest.java:
##########
@@ -50,28 +57,53 @@ void testValidSystemPropertiesOnStart(
                     + PARTITION_DISTRIBUTION_RESET_TIMEOUT + ".propertyValue = 
\"5\"}")
             SystemDistributedConfiguration systemConfig
     ) {
-        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig);
+        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
         config.startAndInit();
 
         assertEquals(5, config.partitionDistributionResetTimeout());
     }
 
     @Test
     void testValidSystemPropertiesOnChange(@InjectConfiguration 
SystemDistributedConfiguration systemConfig) {
-        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig);
+        var config = new 
DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
         config.startAndInit();
 
         changeSystemConfig(systemConfig, "10");
 
         assertEquals(10, config.partitionDistributionResetTimeout());
     }
 
+    @Test
+    void testUpdateConfigListener(@InjectConfiguration 
SystemDistributedConfiguration systemConfig) throws InterruptedException {
+        AtomicReference<Integer> partitionDistributionResetTimeoutValue = new 
AtomicReference<>();
+        AtomicReference<Long> revisionValue = new AtomicReference<>();
+
+        var config = new DistributionZonesHighAvailabilityConfiguration(
+                systemConfig,
+                (partitionDistributionResetTimeout, revision) -> {
+                    
partitionDistributionResetTimeoutValue.set(partitionDistributionResetTimeout);
+                    revisionValue.set(revision);
+                }
+        );
+        config.startAndInit();
+
+        assertNotEquals(10, partitionDistributionResetTimeoutValue.get());
+        assertNotEquals(1, revisionValue.get());
+
+        changeSystemConfig(systemConfig, "10");
+
+        assertTrue(waitForCondition(() ->
+                partitionDistributionResetTimeoutValue.get() != null
+                        && partitionDistributionResetTimeoutValue.get() == 10, 
1_000));
+        assertEquals(1, revisionValue.get());
+    }
+
     private static void changeSystemConfig(
             SystemDistributedConfiguration systemConfig,
-            String partitionDistributionResetScaleDown
+            String partitionDistributionReset
     ) {
         CompletableFuture<Void> changeFuture = systemConfig.change(c0 -> 
c0.changeProperties()
-                .create(PARTITION_DISTRIBUTION_RESET_TIMEOUT, c1 -> 
c1.changePropertyValue(partitionDistributionResetScaleDown))
+                .create(PARTITION_DISTRIBUTION_RESET_TIMEOUT, c1 -> 
c1.changePropertyValue(partitionDistributionReset))
         );
 
         assertThat(changeFuture, willCompleteSuccessfully());

Review Comment:
   Where do you test, that only zones in HA mode will trigger reset timers 
scheduling?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/configuration/DistributionZonesHighAvailabilityConfiguration.java:
##########
@@ -35,22 +36,28 @@ public class DistributionZonesHighAvailabilityConfiguration 
{
     static final String PARTITION_DISTRIBUTION_RESET_TIMEOUT = 
"partitionDistributionResetTimeout";
 
     /** Default value for the {@link #PARTITION_DISTRIBUTION_RESET_TIMEOUT}. */
-    private static final long 
PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE = 0;
+    private static final int 
PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE = 0;
 
     private final SystemDistributedConfiguration systemDistributedConfig;
 
     /** Determines partition group reset timeout after a partition group 
majority loss. */
-    private volatile long partitionDistributionResetTimeout;
+    private volatile int partitionDistributionResetTimeout;
+
+    /** Listener, which receives (timeout, revision) on every configuration 
update. */
+    private final BiConsumer<Integer, Long> partitionDistributionResetListener;
 
     /** Constructor. */
-    public 
DistributionZonesHighAvailabilityConfiguration(SystemDistributedConfiguration 
systemDistributedConfig) {
+    public DistributionZonesHighAvailabilityConfiguration(
+            SystemDistributedConfiguration systemDistributedConfig,
+            BiConsumer<Integer, Long> partitionDistributionResetListener) {

Review Comment:
   I'd rather add listener as a separate method instead of propagating it into 
constructor + start(). Precisely I mean adding 
`onPartitionDistributionResetTimeoutUpdate()`. In that case 
DistributionZonesHighAvailabilityConfiguration will have clear contract
   
   - partitionDistributionResetTimeout()
   - onPartitionDistributionResetTimeoutUpdate()



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -375,6 +379,42 @@ private CompletableFuture<Void> 
onUpdateScaleUpBusy(AlterZoneEventParameters par
         return nullCompletedFuture();
     }
 
+    private CompletableFuture<Void> onUpdatePartitionDistributionResetBusy(int 
partitionDistributionReset, long causalityToken) {

Review Comment:
   I do understand that you've just copy-pasted another listener (with some 
adjustments of course), however worth mentioning that the code looks a bit 
untidy:
   
   1. partitionDistributionReset -> 
partitionDistributionReset**TimeoutSeconds** or 
partitionDistributionReset**DelaySeconds**
   2. Code inside for loop is almost the same as in onUpdateScaleUpBusy and 
onUpdateScaleDownBusy. Up to you whether to fix it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to