keith-turner commented on code in PR #5708:
URL: https://github.com/apache/accumulo/pull/5708#discussion_r2183005955
##########
server/base/src/main/java/org/apache/accumulo/server/conf/SystemConfiguration.java:
##########
@@ -18,36 +18,144 @@
*/
package org.apache.accumulo.server.conf;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.COMPACTOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.GARBAGE_COLLECTOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.MANAGER;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.MONITOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.store.SystemPropKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+
public class SystemConfiguration extends ZooBasedConfiguration {
private static final Logger log =
LoggerFactory.getLogger(SystemConfiguration.class);
+ private static class ChangedPropertyMonitor implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ChangedPropertyMonitor.class);
+ private static Map<ServerId.Type,List<Property>> SERVER_PROPERTY_PREFIXES
= Map.of(COMPACTOR,
+ List.of(Property.COMPACTOR_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ GARBAGE_COLLECTOR,
+ List.of(Property.GC_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX), MANAGER,
+ List.of(Property.MANAGER_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX), MONITOR,
+ List.of(Property.MONITOR_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ SCAN_SERVER, List.of(Property.SSERV_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ TABLET_SERVER,
+ List.of(Property.TSERV_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX));
+
+ private final AtomicReference<Set<String>> propsChanged = new
AtomicReference<>(Set.of());
+ private final List<Property> applicableProperties;
+ private volatile Map<String,String> currentProperties;
+ private volatile long currentVersion;
+
+ public ChangedPropertyMonitor(long initialVersion, Map<String,String>
initialProperties,
+ ServerId.Type serverType) {
+ this.applicableProperties = SERVER_PROPERTY_PREFIXES.get(serverType);
+ this.currentProperties = initialProperties;
+ this.currentVersion = initialVersion;
+ }
+
+ private void changedProperties(Set<String> props) {
+ final Set<String> changed = new HashSet<>();
+ props.forEach(p -> {
+ applicableProperties.forEach(ap -> {
+ if (p.startsWith(ap.getKey())) {
+ changed.add(p);
+ }
+ });
+ });
+ propsChanged.set(changed);
+ }
+
+ public void update(long version, Map<String,String> properties) {
Review Comment:
Multiple threads reading configuration could end up in this code at the same
time. There are multiple variables being read and written so it seems like this
method and other methods in the class should be synchronized to consistently
set all the variables.
```suggestion
public synchronized void update(long version, Map<String,String>
properties) {
```
##########
server/base/src/main/java/org/apache/accumulo/server/conf/SystemConfiguration.java:
##########
@@ -18,36 +18,144 @@
*/
package org.apache.accumulo.server.conf;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.COMPACTOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.GARBAGE_COLLECTOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.MANAGER;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.MONITOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.store.SystemPropKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+
public class SystemConfiguration extends ZooBasedConfiguration {
private static final Logger log =
LoggerFactory.getLogger(SystemConfiguration.class);
+ private static class ChangedPropertyMonitor implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ChangedPropertyMonitor.class);
+ private static Map<ServerId.Type,List<Property>> SERVER_PROPERTY_PREFIXES
= Map.of(COMPACTOR,
+ List.of(Property.COMPACTOR_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ GARBAGE_COLLECTOR,
+ List.of(Property.GC_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX), MANAGER,
+ List.of(Property.MANAGER_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX), MONITOR,
+ List.of(Property.MONITOR_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ SCAN_SERVER, List.of(Property.SSERV_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ TABLET_SERVER,
+ List.of(Property.TSERV_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX));
+
+ private final AtomicReference<Set<String>> propsChanged = new
AtomicReference<>(Set.of());
+ private final List<Property> applicableProperties;
+ private volatile Map<String,String> currentProperties;
+ private volatile long currentVersion;
+
+ public ChangedPropertyMonitor(long initialVersion, Map<String,String>
initialProperties,
+ ServerId.Type serverType) {
+ this.applicableProperties = SERVER_PROPERTY_PREFIXES.get(serverType);
+ this.currentProperties = initialProperties;
+ this.currentVersion = initialVersion;
+ }
+
+ private void changedProperties(Set<String> props) {
+ final Set<String> changed = new HashSet<>();
+ props.forEach(p -> {
+ applicableProperties.forEach(ap -> {
+ if (p.startsWith(ap.getKey())) {
+ changed.add(p);
+ }
+ });
+ });
+ propsChanged.set(changed);
+ }
+
+ public void update(long version, Map<String,String> properties) {
+ if (currentVersion == version) {
+ return;
+ }
+ MapDifference<String,String> diff = Maps.difference(currentProperties,
properties);
+ currentProperties = properties;
+ currentVersion = version;
+ if (diff.areEqual()) {
+ return;
+ }
+ changedProperties(diff.entriesDiffering().keySet());
+ }
+
+ public void propChecked(Property p) {
+ final Set<String> changed = propsChanged.get();
+ if (changed.isEmpty()) {
+ return;
+ }
+ propsChanged.get().remove(p.getKey());
+ }
+
+ @Override
+ public void run() {
+ final Set<String> changed = propsChanged.get();
+ if (!changed.isEmpty()) {
+ LOG.warn("The following properties have changed, but have not yet been
read: {}", changed);
+ }
+ }
+
+ }
+
+ private final Map<String,String> initialProperties;
private final RuntimeFixedProperties runtimeFixedProps;
+ private final ChangedPropertyMonitor monitor;
public SystemConfiguration(ServerContext context, SystemPropKey propStoreKey,
AccumuloConfiguration parent) {
+ this(context, propStoreKey, parent, Optional.empty());
+ }
+
+ public SystemConfiguration(ServerContext context, SystemPropKey propStoreKey,
+ AccumuloConfiguration parent, Optional<ServerId.Type> serverType) {
super(log, context, propStoreKey, parent);
- runtimeFixedProps = new RuntimeFixedProperties(getSnapshot(),
context.getSiteConfiguration());
+ initialProperties = getSnapshot();
+ if (serverType.isPresent()) {
+ monitor =
+ new ChangedPropertyMonitor(getDataVersion(), initialProperties,
serverType.orElseThrow());
+ // start the monitor as a scheduled task at some interval.
Review Comment:
is this a todo?
##########
server/base/src/main/java/org/apache/accumulo/server/conf/SystemConfiguration.java:
##########
@@ -18,36 +18,144 @@
*/
package org.apache.accumulo.server.conf;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.COMPACTOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.GARBAGE_COLLECTOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.MANAGER;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.MONITOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.store.SystemPropKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+
public class SystemConfiguration extends ZooBasedConfiguration {
private static final Logger log =
LoggerFactory.getLogger(SystemConfiguration.class);
+ private static class ChangedPropertyMonitor implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ChangedPropertyMonitor.class);
+ private static Map<ServerId.Type,List<Property>> SERVER_PROPERTY_PREFIXES
= Map.of(COMPACTOR,
+ List.of(Property.COMPACTOR_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ GARBAGE_COLLECTOR,
+ List.of(Property.GC_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX), MANAGER,
+ List.of(Property.MANAGER_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX), MONITOR,
+ List.of(Property.MONITOR_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ SCAN_SERVER, List.of(Property.SSERV_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ TABLET_SERVER,
+ List.of(Property.TSERV_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX));
+
+ private final AtomicReference<Set<String>> propsChanged = new
AtomicReference<>(Set.of());
+ private final List<Property> applicableProperties;
+ private volatile Map<String,String> currentProperties;
+ private volatile long currentVersion;
+
+ public ChangedPropertyMonitor(long initialVersion, Map<String,String>
initialProperties,
+ ServerId.Type serverType) {
+ this.applicableProperties = SERVER_PROPERTY_PREFIXES.get(serverType);
+ this.currentProperties = initialProperties;
+ this.currentVersion = initialVersion;
+ }
+
+ private void changedProperties(Set<String> props) {
+ final Set<String> changed = new HashSet<>();
+ props.forEach(p -> {
+ applicableProperties.forEach(ap -> {
+ if (p.startsWith(ap.getKey())) {
+ changed.add(p);
+ }
+ });
+ });
+ propsChanged.set(changed);
+ }
+
+ public void update(long version, Map<String,String> properties) {
+ if (currentVersion == version) {
+ return;
+ }
+ MapDifference<String,String> diff = Maps.difference(currentProperties,
properties);
+ currentProperties = properties;
+ currentVersion = version;
+ if (diff.areEqual()) {
+ return;
+ }
+ changedProperties(diff.entriesDiffering().keySet());
+ }
+
+ public void propChecked(Property p) {
+ final Set<String> changed = propsChanged.get();
+ if (changed.isEmpty()) {
+ return;
+ }
+ propsChanged.get().remove(p.getKey());
+ }
+
+ @Override
+ public void run() {
Review Comment:
Not seeing the code that starts a thread or timer task to call this.
##########
server/base/src/main/java/org/apache/accumulo/server/conf/SystemConfiguration.java:
##########
@@ -18,36 +18,144 @@
*/
package org.apache.accumulo.server.conf;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.COMPACTOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.GARBAGE_COLLECTOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.MANAGER;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.MONITOR;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER;
+import static
org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.store.SystemPropKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+
public class SystemConfiguration extends ZooBasedConfiguration {
private static final Logger log =
LoggerFactory.getLogger(SystemConfiguration.class);
+ private static class ChangedPropertyMonitor implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ChangedPropertyMonitor.class);
+ private static Map<ServerId.Type,List<Property>> SERVER_PROPERTY_PREFIXES
= Map.of(COMPACTOR,
+ List.of(Property.COMPACTOR_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ GARBAGE_COLLECTOR,
+ List.of(Property.GC_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX), MANAGER,
+ List.of(Property.MANAGER_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX), MONITOR,
+ List.of(Property.MONITOR_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ SCAN_SERVER, List.of(Property.SSERV_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX),
+ TABLET_SERVER,
+ List.of(Property.TSERV_PREFIX, Property.RPC_PREFIX,
Property.INSTANCE_PREFIX));
+
+ private final AtomicReference<Set<String>> propsChanged = new
AtomicReference<>(Set.of());
+ private final List<Property> applicableProperties;
+ private volatile Map<String,String> currentProperties;
+ private volatile long currentVersion;
+
+ public ChangedPropertyMonitor(long initialVersion, Map<String,String>
initialProperties,
+ ServerId.Type serverType) {
+ this.applicableProperties = SERVER_PROPERTY_PREFIXES.get(serverType);
+ this.currentProperties = initialProperties;
+ this.currentVersion = initialVersion;
+ }
+
+ private void changedProperties(Set<String> props) {
+ final Set<String> changed = new HashSet<>();
+ props.forEach(p -> {
+ applicableProperties.forEach(ap -> {
+ if (p.startsWith(ap.getKey())) {
+ changed.add(p);
+ }
+ });
+ });
+ propsChanged.set(changed);
+ }
+
+ public void update(long version, Map<String,String> properties) {
+ if (currentVersion == version) {
+ return;
+ }
+ MapDifference<String,String> diff = Maps.difference(currentProperties,
properties);
+ currentProperties = properties;
+ currentVersion = version;
+ if (diff.areEqual()) {
+ return;
+ }
+ changedProperties(diff.entriesDiffering().keySet());
Review Comment:
javadoc for entriesDiffering is :
```
Returns an unmodifiable map describing keys that appear in both maps, but
with different values
```
So maybe if there is a new prop that was not present in the old map it would
not be seen. Should this be something like the following?
```java
Sets.union(diff.entriesDiffering().keySet(),
diff.entriesOnlyOnRight().keySet())
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]