This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 189e9c10c7 Added PropStore.invalidate for immediate consistency on
prop change (#5582)
189e9c10c7 is described below
commit 189e9c10c779c5930ae87ba757ffbdcb0e9964c8
Author: Dave Marion <[email protected]>
AuthorDate: Wed May 28 10:21:59 2025 -0400
Added PropStore.invalidate for immediate consistency on prop change (#5582)
ZooPropStore is eventually consistent when a property is changed
as it waits for the PropStoreWatcher to receive the event which
calls remove on the Caffeine cache. Subsequent calls to
ZooPropStore.get will rely on the ZooPropLoader to re-read
the information from ZooKeeper and populate the Caffeine cache.
ZooBasedConfiguration does not read directly from ZooPropStore
and instead reads a snapshot of the data using PropSnapshot.
When ZooBasedConfiguration.invalidateCache is called it just
marks the PropSnapshot as needing an update, which will re-read
from the underlying ZooPropStore. However, if the PropStoreWatcher
has not yet fired, then the information read will be the same as
the ZooPropStore has not yet been updated.
The new PropStore.invalidate method implementation in ZooPropStore
removes the node from the Caffeine cache, the same thing that the
PropStoreWatcher will do when it receives the event. When a property
is changed in the local process, then the cache entry will be
immediately invalidated and re-read. The downside to this change
is that it will be read twice, when the PropStoreWatcher receives
the event.
Closes #5541
---
.../accumulo/server/conf/store/PropStore.java | 7 ++
.../server/conf/store/impl/ZooPropStore.java | 5 ++
.../accumulo/server/conf/util/PropSnapshot.java | 1 +
.../server/conf/NamespaceConfigurationTest.java | 2 +
.../server/conf/SystemConfigurationTest.java | 2 +
.../server/conf/TableConfigurationTest.java | 14 ++++
.../server/conf/util/PropSnapshotTest.java | 6 ++
.../test/functional/AccumuloConfigurationIT.java | 87 ++++++++++++++++++++++
8 files changed, 124 insertions(+)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java
index 3d586e4329..036c7a4e6a 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java
@@ -129,4 +129,11 @@ public interface PropStore {
* @return true if the stored version matches the provided expected version.
*/
boolean validateDataVersion(PropStoreKey<?> storeKey, long expectedVersion);
+
+ /**
+ * Invalidate the properties associated with the provided key so that they
are re-fetched.
+ *
+ * @param storeKey the prop cache key
+ */
+ void invalidate(PropStoreKey<?> storeKey);
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
index d25cbd2bd4..6168e18d8c 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
@@ -450,4 +450,9 @@ public class ZooPropStore implements PropStore,
PropChangeListener {
return true;
}
+ @Override
+ public void invalidate(PropStoreKey<?> storeKey) {
+ cache.remove(storeKey);
+ }
+
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java
index 3f47cbbcbd..4f2f3fc32f 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java
@@ -81,6 +81,7 @@ public class PropSnapshot implements PropChangeListener {
updateLock.lock();
try {
needsUpdate.set(true);
+ propStore.invalidate(propStoreKey);
} finally {
updateLock.unlock();
}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java
index 7fce935b7e..08ebc9225a 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java
@@ -81,6 +81,8 @@ public class NamespaceConfigurationTest {
Map.of(Property.INSTANCE_SECRET.getKey(), "sekrit"))).anyTimes();
propStore.registerAsListener(eq(nsPropStoreKey), anyObject());
expectLastCall().anyTimes();
+ propStore.invalidate(nsPropStoreKey);
+ expectLastCall().anyTimes();
replay(propStore, context);
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java
index 17a8a561fa..0b73b98563 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java
@@ -117,6 +117,8 @@ public class SystemConfigurationTest {
Map.of(GC_PORT.getKey(), "3456", TSERV_SCAN_MAX_OPENFILES.getKey(),
"27",
TABLE_BLOOM_ENABLED.getKey(), "false", TABLE_BLOOM_SIZE.getKey(),
"2048"));
expect(propStore.get(eq(sysPropKey))).andReturn(sysUpdateProps).anyTimes();
+ propStore.invalidate(sysPropKey);
+ expectLastCall().atLeastOnce();
replay(propStore);
sysConfig.zkChangeEvent(sysPropKey);
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
index 9c60673773..e01f2258d4 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
@@ -138,6 +138,8 @@ public class TableConfigurationTest {
expect(propStore.get(eq(propKey)))
.andReturn(new VersionedProperties(37, Instant.now(),
Map.of(p.getKey(), "sekrit")))
.anyTimes();
+ propStore.invalidate(propKey);
+ expectLastCall().atLeastOnce();
replay(propStore);
tableConfig.zkChangeEvent(propKey);
@@ -159,6 +161,8 @@ public class TableConfigurationTest {
.anyTimes();
expect(propStore.get(eq(TablePropKey.of(instanceId, TID))))
.andReturn(new VersionedProperties(Map.of())).anyTimes();
+ propStore.invalidate(NamespacePropKey.of(instanceId, NID));
+ expectLastCall().atLeastOnce();
replay(propStore);
nsConfig.zkChangeEvent(NamespacePropKey.of(instanceId, NID));
@@ -184,6 +188,10 @@ public class TableConfigurationTest {
expect(propStore.get(eq(TablePropKey.of(instanceId, TID))))
.andReturn(new VersionedProperties(4, Instant.now(), Map.of("foo",
"bar", "tick", "tock")))
.anyTimes();
+ propStore.invalidate(TablePropKey.of(instanceId, TID));
+ expectLastCall().atLeastOnce();
+ propStore.invalidate(NamespacePropKey.of(instanceId, NID));
+ expectLastCall().atLeastOnce();
replay(propStore);
@@ -221,6 +229,10 @@ public class TableConfigurationTest {
expect(propStore.get(eq(TablePropKey.of(instanceId, TID)))).andReturn(new
VersionedProperties(4,
Instant.now(), Map.of("filter", "not_returned_by_table", "foo", "bar",
"tick", "tock")))
.anyTimes();
+ propStore.invalidate(TablePropKey.of(instanceId, TID));
+ expectLastCall().atLeastOnce();
+ propStore.invalidate(NamespacePropKey.of(instanceId, NID));
+ expectLastCall().atLeastOnce();
replay(propStore);
@@ -257,6 +269,8 @@ public class TableConfigurationTest {
.once();
expect(propStore.get(eq(propKey)))
.andReturn(new VersionedProperties(39, Instant.now(),
Map.of(p.getKey(), "sekrit"))).once();
+ propStore.invalidate(propKey);
+ expectLastCall().atLeastOnce();
replay(propStore);
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/util/PropSnapshotTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/util/PropSnapshotTest.java
index 155db98260..ae218626b0 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/util/PropSnapshotTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/util/PropSnapshotTest.java
@@ -70,6 +70,8 @@ class PropSnapshotTest {
// after update
expect(propStore.get(eq(SystemPropKey.of(instanceId))))
.andReturn(new VersionedProperties(124, Instant.now(), Map.of("k3",
"v3"))).once();
+ propStore.invalidate(SystemPropKey.of(instanceId));
+ expectLastCall().atLeastOnce();
replay(propStore);
PropSnapshot snapshot = PropSnapshot.create(SystemPropKey.of(instanceId),
propStore);
@@ -97,6 +99,8 @@ class PropSnapshotTest {
expect(propStore.get(eq(sysPropKey))).andReturn(
new VersionedProperties(100, Instant.now(),
Map.of(TABLE_BLOOM_ENABLED.getKey(), "false")))
.once();
+ propStore.invalidate(SystemPropKey.of(instanceId));
+ expectLastCall().atLeastOnce();
replay(propStore);
@@ -120,6 +124,8 @@ class PropSnapshotTest {
expect(propStore.get(eq(sysPropKey))).andThrow(new
IllegalStateException("Fake node delete"))
.once();
+ propStore.invalidate(sysPropKey);
+ expectLastCall().atLeastOnce();
replay(propStore);
PropSnapshot snapshot = PropSnapshot.create(sysPropKey, propStore);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloConfigurationIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloConfigurationIT.java
new file mode 100644
index 0000000000..4a2865f8dc
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloConfigurationIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.util.Timer;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class AccumuloConfigurationIT extends SharedMiniClusterBase {
+
+ private static final String fakeProperty = "general.custom.fake.property";
+
+ private static class ConfigurationCallback implements
MiniClusterConfigurationCallback {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
coreSite) {
+ cfg.setProperty(fakeProperty, "1");
+ }
+
+ }
+
+ @BeforeAll
+ public static void beforeTests() throws Exception {
+ SharedMiniClusterBase.startMiniClusterWithConfig(new
ConfigurationCallback());
+ }
+
+ @AfterAll
+ public static void afterTests() {
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ @Test
+ public void testInvalidation() throws Exception {
+
+ final ServerContext ctx = getCluster().getServerContext();
+ String initialThreads = ctx.getConfiguration().get(fakeProperty);
+
+ Timer timer = null;
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ timer = Timer.startNew();
+ c.instanceOperations().setProperty(fakeProperty, "4");
+ }
+
+ ctx.getConfiguration().invalidateCache();
+
+ int oldValueReturned = 0;
+ while (ctx.getConfiguration().get(fakeProperty).equals(initialThreads)) {
+ oldValueReturned++;
+ Thread.sleep(25);
+ }
+ System.out.println("Configuration returned old value " + oldValueReturned
+ " times and took "
+ + timer.elapsed(TimeUnit.MILLISECONDS) + "ms");
+
+ assertEquals("4", ctx.getConfiguration().get(fakeProperty));
+ assertEquals(0, oldValueReturned);
+
+ }
+
+}