This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 259cffe2 CASSSIDECAR-232: Enhance the Cluster Lease Claim task feature (#211) 259cffe2 is described below commit 259cffe2885f7def2d1ef20a298fa7ec898eee71 Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Wed Mar 19 20:59:06 2025 -0700 CASSSIDECAR-232: Enhance the Cluster Lease Claim task feature (#211) Patch by Francisco Guerrero; reviewed by Yifan Cai for CASSSIDECAR-232 --- CHANGES.txt | 1 + conf/sidecar.yaml | 27 ++- examples/conf/sidecar-ccm.yaml | 93 +++++++- ...on.java => ClusterLeaseClaimConfiguration.java} | 8 +- .../sidecar/config/CoordinationConfiguration.java | 2 +- .../sidecar/config/RandomInitialDelayDelta.java | 49 ++++ .../yaml/ClusterLeaseClaimConfigurationImpl.java | 141 ++++++++++++ .../config/yaml/CoordinationConfigurationImpl.java | 13 +- .../config/yaml/PeriodicTaskConfigurationImpl.java | 101 ++++++++- .../yaml/SchemaKeyspaceConfigurationImpl.java | 2 +- ...ctTokenZeroOfKeyspaceElectorateMembership.java} | 110 ++------- .../coordination/ClusterLeaseClaimTask.java | 25 ++- ...catedKeyspaceTokenZeroElectorateMembership.java | 104 +-------- ...decarInternalTokenZeroElectorateMembership.java | 46 ++++ .../sidecar/db/SidecarLeaseDatabaseAccessor.java | 14 +- .../sidecar/modules/CoordinationModule.java | 3 +- .../modules/ElectorateMembershipFactory.java | 62 ++++++ ...kenZeroElectorateMembershipIntegrationTest.java | 46 ++-- .../sidecar/testing/IntegrationTestModule.java | 11 +- .../sidecar/config/SidecarConfigurationTest.java | 57 ++++- .../coordination/ClusterLeaseClaimTaskTest.java | 10 +- .../cassandra/sidecar/db/SidecarSchemaTest.java | 4 +- .../modules/ElectorateMembershipFactoryTest.java | 93 ++++++++ .../resources/config/sidecar_coordination.yaml | 247 --------------------- .../config/sidecar_multiple_instances.yaml | 10 +- .../resources/config/sidecar_single_instance.yaml | 5 +- 26 files changed, 768 insertions(+), 516 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index baef5ee7..b51f2024 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.2.0 ----- + * Enhance the Cluster Lease Claim task feature (CASSSIDECAR-232) * Capture Metrics for Schema Reporting (CASSSIDECAR-216) * SidecarInstanceCodec is failing to find codec for type (CASSSIDECAR-229) * Retry Failed Schema Reports (CASSSIDECAR-217) diff --git a/conf/sidecar.yaml b/conf/sidecar.yaml index ada43657..246ed486 100644 --- a/conf/sidecar.yaml +++ b/conf/sidecar.yaml @@ -121,20 +121,42 @@ sidecar: replication_strategy: SimpleStrategy replication_factor: 1 # The TTL in seconds used to insert entries into the sidecar_lease schema - lease_schema_ttl: 2m + lease_schema_ttl: 5m coordination: # Captures configuration parameters for the task that performs the cluster lease claim process cluster_lease_claim: + # The name of the strategy used to determine the electorate membership (defaults to MostReplicatedKeyspaceTokenZeroElectorateMembership) + # Out of the box Sidecar provides the MostReplicatedKeyspaceTokenZeroElectorateMembership, and + # SidecarInternalTokenZeroElectorateMembership implementations. + # - MostReplicatedKeyspaceTokenZeroElectorateMembership the current Sidecar will be determined to be part + # of the electorate iff one of the Cassandra instances it + # manages owns token 0 for the user keyspace that has the + # highest replication factor. If multiple keyspaces have + # the highest replication factor, the keyspace to be used + # is decided by the keyspace with the name that sorts + # first in the lexicographic sort order. If no user + # keyspaces are created, the internal sidecar keyspace will + # be used. + # - SidecarInternalTokenZeroElectorateMembership the current Sidecar will be determined to be part of the + # electorate iff one of the Cassandra instances it manages + # owns token {@code 0} for the {@code sidecar_internal} + # keyspace. + electorate_membership_strategy: MostReplicatedKeyspaceTokenZeroElectorateMembership # Whether the process is enabled enabled: true # The initial delay for the first execution of the cluster lease claim process task after being # scheduled or rescheduled. # The minimum value for the initial delay is 0ms. initial_delay: 1s + # A random delta value to add jitter to the initial delay for the first execution of the cluster + # lease claim process. The actual initial delay for the task will be a millisecond value of the + # initial_delay + RANDOM(initial_delay_random_delta) configuration. + # The minimum value for the initial delay random delta is 0ms, which in practice disables the jitter. + initial_delay_random_delta: 30s # How often the cluster lease claim process task will execute after the previous task has completed # the execution. # The minimum value allowed for the cluster lease claim process task implementation is 30 seconds - execute_interval: 1m + execute_interval: 100s vertx: filesystem_options: @@ -234,7 +256,6 @@ access_control: driver_parameters: contact_points: - "127.0.0.1:9042" - - "127.0.0.2:9042" username: cassandra password: cassandra ssl: diff --git a/examples/conf/sidecar-ccm.yaml b/examples/conf/sidecar-ccm.yaml index 7fb4f814..bbd41e12 100644 --- a/examples/conf/sidecar-ccm.yaml +++ b/examples/conf/sidecar-ccm.yaml @@ -213,26 +213,57 @@ sidecar: replication_strategy: SimpleStrategy replication_factor: 1 # The TTL in seconds used to insert entries into the sidecar_lease schema - lease_schema_ttl: 2m + lease_schema_ttl: 5m coordination: # Captures configuration parameters for the task that performs the cluster lease claim process cluster_lease_claim: + # The name of the strategy used to determine the electorate membership (defaults to MostReplicatedKeyspaceTokenZeroElectorateMembership) + # Out of the box Sidecar provides the MostReplicatedKeyspaceTokenZeroElectorateMembership, and + # SidecarInternalTokenZeroElectorateMembership implementations. + # - MostReplicatedKeyspaceTokenZeroElectorateMembership the current Sidecar will be determined to be part + # of the electorate iff one of the Cassandra instances it + # manages owns token 0 for the user keyspace that has the + # highest replication factor. If multiple keyspaces have + # the highest replication factor, the keyspace to be used + # is decided by the keyspace with the name that sorts + # first in the lexicographic sort order. If no user + # keyspaces are created, the internal sidecar keyspace will + # be used. + # - SidecarInternalTokenZeroElectorateMembership the current Sidecar will be determined to be part of the + # electorate iff one of the Cassandra instances it manages + # owns token {@code 0} for the {@code sidecar_internal} + # keyspace. + electorate_membership_strategy: MostReplicatedKeyspaceTokenZeroElectorateMembership # Whether the process is enabled enabled: true # The initial delay for the first execution of the cluster lease claim process task after being # scheduled or rescheduled. # The minimum value for the initial delay is 0ms. initial_delay: 1s + # A random delta value to add jitter to the initial delay for the first execution of the cluster + # lease claim process. The actual initial delay for the task will be a millisecond value of the + # initial_delay + RANDOM(initial_delay_random_delta) configuration. + # The minimum value for the initial delay random delta is 0ms, which in practice disables the jitter. + initial_delay_random_delta: 30s # How often the cluster lease claim process task will execute after the previous task has completed # the execution. # The minimum value allowed for the cluster lease claim process task implementation is 30 seconds - execute_interval: 1m + execute_interval: 100s vertx: filesystem_options: classpath_resolving_enabled: false file_caching_enabled: false +schema_reporting: # Schema Reporting configuration + enabled: false # Disabled by default + initial_delay: 6h # Maximum delay before the first schema report (actual delay is randomized) + execute_interval: 12h # Exact interval between two sequential schema reports + endpoint: http://localhost/schema # Endpoint address for schema reporting + method: PUT # HTTP verb to use for schema reporting + max_retries: 3 # Number of times a failing schema report is retried + retry_delay: 1m # Delay before a failing schema report is retried + # # Enable SSL configuration (Disabled by default) # @@ -257,8 +288,12 @@ vertx: access_control: # When enabled requests need to be authenticated and authorized before servicing. enabled: false - # Supports setting multiple authenticators, request is authenticated if it is authenticated by any of the - # configured authenticators + # Supports setting multiple authenticators, request is authenticated when the first authenticator allows the request + # to go through. + # Out of the box, Cassandra Sidecar provides following authenticator provider factories + # org.apache.cassandra.sidecar.acl.authentication.{MutualTlsAuthenticationHandlerFactory, JwtAuthenticationHandlerFactory}. + # - MutualTlsAuthenticationHandlerFactory allows authenticating based on user certificates + # - JwtAuthenticationHandlerFactory allows authenticating with user JWT tokens authenticators: - class_name: org.apache.cassandra.sidecar.acl.authentication.MutualTlsAuthenticationHandlerFactory parameters: @@ -277,6 +312,19 @@ access_control: # # other options are, io.vertx.ext.auth.mtls.impl.SpiffeIdentityExtractor. certificate_identity_extractor: org.apache.cassandra.sidecar.acl.authentication.CassandraIdentityExtractor + # JwtAuthenticationHandlerFactory adds support to authenticate users with their JWT tokens. It also includes + # supports for OpenID discovery. + - class_name: org.apache.cassandra.sidecar.acl.authentication.JwtAuthenticationHandlerFactory + parameters: + # To selectively enable or disable JWT authentication + enabled: false + # Site for sidecar to dynamically retrieve the configuration information of an OpenID provider, without + # having to manually configure settings like issuer etc. + site: https://authorization.com + # Client Id is a unique identifier assigned by OpenID provider. It is used to identity applications/users + # trying to connect. + client_id: recognized_client_id + config_discover_interval: 1h authorizer: # Authorization backend, implementing io.vertx.ext.auth.authorization.AuthorizationProvider; used to # provide permissions a user holds. @@ -321,6 +369,43 @@ healthcheck: initial_delay: 0ms execute_interval: 30s +# Sidecar Peer Health Monitor settings +# Enables a periodic task checking for the health of adjacent Sidecar peers in the token ring +sidecar_peer_health: + # Determines if the peer health monitor periodic task is enabled or not + enabled: false + # Time between peer health checks + execute_interval: 30s + # The amount of retries the client will attempt a request + max_retries: 5 + # The initial delay between the retries the client will attempt a request + retry_delay: 10s + +# Sidecar client settings used to interact with other sidecars +sidecar_client: + request_timeout: 30s # Time in which a request made by the sidecar client will time out + request_idle_timeout: 30s # How long the request can be idle + + # More advanced options to configure the sidecar client + + #connection_pool_max_size: 10 # Max size of the client connection pool + #connection_pool_clearing_period: 5s # Period of time for the connection pool to clear + #connection_pool_event_loop_size: 0 # Defines the size of the event loop pool, set to 0 to reuse current event-loop + #connection_pool_max_wait_queue_size: -1 # Connection pool max time to wait on the queue size, set to -1 for an unbounded queue + #max_retries: 5 # The amount of retries the client will attempt a request + #retry_delay: 500ms # The initial delay between the retries the client will attempt a request + #max_retry_delay: 10s # The max delay between the retries the client will attempt a request + #ssl: # if ssl is enabled, this is the ssl configuration used for the sidecar client + # enabled: false + # keystore: + # type: PKCS12 # Keystore type + # path: path/to/client/keystore.p12 # Path to the client keystore file + # password: password # Keystore password + # truststore: + # type: PKCS12 # Truststore type + # path: path/to/truststore.p12 # Path to the truststore file + # password: password # Truststore password + metrics: registry_name: cassandra_sidecar vertx: diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/CoordinationConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/ClusterLeaseClaimConfiguration.java similarity index 74% copy from server/src/main/java/org/apache/cassandra/sidecar/config/CoordinationConfiguration.java copy to server/src/main/java/org/apache/cassandra/sidecar/config/ClusterLeaseClaimConfiguration.java index d6df84f7..cb8093f9 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/CoordinationConfiguration.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/ClusterLeaseClaimConfiguration.java @@ -19,12 +19,12 @@ package org.apache.cassandra.sidecar.config; /** - * Configuration relevant to the coordination functionality of Sidecar + * Exposes configuration in Sidecar for the cluster lease claim task */ -public interface CoordinationConfiguration +public interface ClusterLeaseClaimConfiguration extends PeriodicTaskConfiguration, RandomInitialDelayDelta { /** - * @return configuration parameters for the cluster lease claim task + * @return the name of the strategy used to determine the electorate membership */ - PeriodicTaskConfiguration clusterLeaseClaimConfiguration(); + String electorateMembershipStrategy(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/CoordinationConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/CoordinationConfiguration.java index d6df84f7..08ea9da3 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/CoordinationConfiguration.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/CoordinationConfiguration.java @@ -26,5 +26,5 @@ public interface CoordinationConfiguration /** * @return configuration parameters for the cluster lease claim task */ - PeriodicTaskConfiguration clusterLeaseClaimConfiguration(); + ClusterLeaseClaimConfiguration clusterLeaseClaimConfiguration(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/RandomInitialDelayDelta.java b/server/src/main/java/org/apache/cassandra/sidecar/config/RandomInitialDelayDelta.java new file mode 100644 index 00000000..09e22a11 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/RandomInitialDelayDelta.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.config; + +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; + +/** + * A randomized delta value that adds jitter to the initial delay configuration of a + * {@link org.apache.cassandra.sidecar.tasks.PeriodicTask} + */ +public interface RandomInitialDelayDelta +{ + /** + * @return the configured value for the initial delay + */ + MillisecondBoundConfiguration initialDelayRandomDelta(); + + /** + * Returns a random delta delay in milliseconds. Internally it uses current's {@link ThreadLocalRandom} + * to calculate the next long value. + * + * @return a random delta delay in milliseconds + */ + default long randomDeltaDelayMillis() + { + long delta = initialDelayRandomDelta().toMillis(); + return delta > 0 + ? ThreadLocalRandom.current().nextLong(delta) + : 0; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ClusterLeaseClaimConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ClusterLeaseClaimConfigurationImpl.java new file mode 100644 index 00000000..cb029bb8 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ClusterLeaseClaimConfigurationImpl.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.config.yaml; + +import java.util.Objects; +import java.util.function.Consumer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.ClusterLeaseClaimConfiguration; + +/** + * Configuration for the {@link org.apache.cassandra.sidecar.coordination.ClusterLeaseClaimTask} + */ +public class ClusterLeaseClaimConfigurationImpl extends PeriodicTaskConfigurationImpl implements ClusterLeaseClaimConfiguration +{ + private static final String DEFAULT_ELECTORATE_MEMBERSHIP_STRATEGY = "MostReplicatedKeyspaceTokenZeroElectorateMembership"; + private static final MillisecondBoundConfiguration DEFAULT_INITIAL_DELAY_RANDOM_DELTA = MillisecondBoundConfiguration.parse("30s"); + public static final PeriodicTaskConfigurationImpl.Builder DEFAULT_PERIODIC_TASK_BUILDER = PeriodicTaskConfigurationImpl.Builder + .builder() + .enabled(true) + .initialDelay(MillisecondBoundConfiguration.parse("1s")) + .executeInterval(MillisecondBoundConfiguration.parse("100s")); + + @JsonProperty("electorate_membership_strategy") + private final String electorateMembershipStrategy; + + @JsonProperty("initial_delay_random_delta") + private final MillisecondBoundConfiguration initialDelayRandomDelta; + + @JsonCreator + public ClusterLeaseClaimConfigurationImpl() + { + super(DEFAULT_PERIODIC_TASK_BUILDER); + this.electorateMembershipStrategy = DEFAULT_ELECTORATE_MEMBERSHIP_STRATEGY; + this.initialDelayRandomDelta = DEFAULT_INITIAL_DELAY_RANDOM_DELTA; + } + + private ClusterLeaseClaimConfigurationImpl(Builder builder) + { + super(builder.periodicTaskBuilder); + electorateMembershipStrategy = Objects.requireNonNull(builder.electorateMembershipStrategy, "electorateMembershipStrategy is required"); + initialDelayRandomDelta = builder.initialDelayRandomDelta; + } + + @Override + @JsonProperty("electorate_membership_strategy") + public String electorateMembershipStrategy() + { + return electorateMembershipStrategy; + } + + @Override + @JsonProperty("initial_delay_random_delta") + public MillisecondBoundConfiguration initialDelayRandomDelta() + { + return initialDelayRandomDelta; + } + + public static Builder builder() + { + return new Builder(); + } + + /** + * {@code ClusterLeaseClaimConfigurationImpl} builder static inner class. + */ + public static final class Builder implements DataObjectBuilder<Builder, ClusterLeaseClaimConfigurationImpl> + { + private String electorateMembershipStrategy = DEFAULT_ELECTORATE_MEMBERSHIP_STRATEGY; + private MillisecondBoundConfiguration initialDelayRandomDelta = DEFAULT_INITIAL_DELAY_RANDOM_DELTA; + private final PeriodicTaskConfigurationImpl.Builder periodicTaskBuilder = DEFAULT_PERIODIC_TASK_BUILDER; + + private Builder() + { + } + + @Override + public Builder self() + { + return this; + } + + /** + * Sets the {@code electorateMembershipStrategy} and returns a reference to this Builder enabling method chaining. + * + * @param electorateMembershipStrategy the {@code electorateMembershipStrategy} to set + * @return a reference to this Builder + */ + public Builder electorateMembershipStrategy(String electorateMembershipStrategy) + { + return update(b -> b.electorateMembershipStrategy = electorateMembershipStrategy); + } + + /** + * Sets the {@code initialDelayRandomDelta} and returns a reference to this Builder enabling method chaining. + * + * @param initialDelayRandomDelta the {@code initialDelayRandomDelta} to set + * @return a reference to this Builder + */ + public Builder initialDelayRandomDelta(MillisecondBoundConfiguration initialDelayRandomDelta) + { + return update(b -> b.initialDelayRandomDelta = initialDelayRandomDelta); + } + + public Builder overridePeriodicTaskConfiguration(Consumer<PeriodicTaskConfigurationImpl.Builder> overrides) + { + periodicTaskBuilder.update(overrides); + return self(); + } + + /** + * Returns a {@code ClusterLeaseClaimConfigurationImpl} built from the parameters previously set. + * + * @return a {@code ClusterLeaseClaimConfigurationImpl} built with parameters of this {@code ClusterLeaseClaimConfigurationImpl.Builder} + */ + @Override + public ClusterLeaseClaimConfigurationImpl build() + { + return new ClusterLeaseClaimConfigurationImpl(this); + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CoordinationConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CoordinationConfigurationImpl.java index e0f6b7cb..c33231b2 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CoordinationConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CoordinationConfigurationImpl.java @@ -19,9 +19,8 @@ package org.apache.cassandra.sidecar.config.yaml; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.ClusterLeaseClaimConfiguration; import org.apache.cassandra.sidecar.config.CoordinationConfiguration; -import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration; /** * Configuration relevant to the coordination functionality of Sidecar @@ -29,16 +28,14 @@ import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration; public class CoordinationConfigurationImpl implements CoordinationConfiguration { @JsonProperty("cluster_lease_claim") - private final PeriodicTaskConfiguration clusterLeaseClaimConfiguration; + private final ClusterLeaseClaimConfiguration clusterLeaseClaimConfiguration; public CoordinationConfigurationImpl() { - this(new PeriodicTaskConfigurationImpl(true, - MillisecondBoundConfiguration.parse("1s"), - MillisecondBoundConfiguration.parse("1m"))); + this(ClusterLeaseClaimConfigurationImpl.builder().build()); } - public CoordinationConfigurationImpl(PeriodicTaskConfiguration clusterLeaseClaimConfiguration) + public CoordinationConfigurationImpl(ClusterLeaseClaimConfiguration clusterLeaseClaimConfiguration) { this.clusterLeaseClaimConfiguration = clusterLeaseClaimConfiguration; } @@ -48,7 +45,7 @@ public class CoordinationConfigurationImpl implements CoordinationConfiguration */ @Override @JsonProperty("cluster_lease_claim") - public PeriodicTaskConfiguration clusterLeaseClaimConfiguration() + public ClusterLeaseClaimConfiguration clusterLeaseClaimConfiguration() { return clusterLeaseClaimConfiguration; } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/PeriodicTaskConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/PeriodicTaskConfigurationImpl.java index 78d115cb..6ee982c7 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/PeriodicTaskConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/PeriodicTaskConfigurationImpl.java @@ -18,6 +18,7 @@ package org.apache.cassandra.sidecar.config.yaml; +import java.util.Objects; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -25,13 +26,16 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonAlias; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration.ONE; /** - * Configuration for the {@link org.apache.cassandra.sidecar.tasks.PeriodicTask} + * Configuration for the {@link PeriodicTask} */ public class PeriodicTaskConfigurationImpl implements PeriodicTaskConfiguration { @@ -60,6 +64,13 @@ public class PeriodicTaskConfigurationImpl implements PeriodicTaskConfiguration this.executeInterval = executeInterval; } + protected PeriodicTaskConfigurationImpl(Builder builder) + { + enabled = builder.enabled; + setInitialDelay(builder.initialDelay); + setExecuteInterval(Objects.requireNonNull(builder.executeInterval, "executeInterval must be configured")); + } + /** * {@inheritDoc} */ @@ -75,6 +86,7 @@ public class PeriodicTaskConfigurationImpl implements PeriodicTaskConfiguration */ @Override @JsonProperty("initial_delay") + @NotNull public MillisecondBoundConfiguration initialDelay() { if (initialDelay == null) @@ -87,14 +99,17 @@ public class PeriodicTaskConfigurationImpl implements PeriodicTaskConfiguration @JsonProperty("initial_delay") public void setInitialDelay(MillisecondBoundConfiguration initialDelay) { - if (initialDelay.compareTo(MillisecondBoundConfiguration.ZERO) > 0) - { - this.initialDelay = initialDelay; - } - else + if (initialDelay != null) { - LOGGER.warn("Invalid initialDelay configuration {}, the minimum value is 0", initialDelay); - this.initialDelay = MillisecondBoundConfiguration.ZERO; + if (initialDelay.compareTo(MillisecondBoundConfiguration.ZERO) > 0) + { + this.initialDelay = initialDelay; + } + else + { + LOGGER.warn("Invalid initialDelay configuration {}, the minimum value is 0", initialDelay); + this.initialDelay = MillisecondBoundConfiguration.ZERO; + } } } @@ -117,6 +132,7 @@ public class PeriodicTaskConfigurationImpl implements PeriodicTaskConfiguration */ @Override @JsonProperty("execute_interval") + @NotNull public MillisecondBoundConfiguration executeInterval() { return executeInterval; @@ -150,4 +166,73 @@ public class PeriodicTaskConfigurationImpl implements PeriodicTaskConfiguration "use 'execute_interval' instead."); setExecuteInterval(new MillisecondBoundConfiguration(executeIntervalMillis, TimeUnit.MILLISECONDS)); } + + /** + * {@code PeriodicTaskConfigurationImpl} builder static inner class. + */ + public static class Builder implements DataObjectBuilder<Builder, PeriodicTaskConfigurationImpl> + { + private boolean enabled; + private MillisecondBoundConfiguration initialDelay; + private MillisecondBoundConfiguration executeInterval; + + private Builder() + { + } + + public static Builder builder() + { + return new Builder(); + } + + @Override + public Builder self() + { + return this; + } + + /** + * Sets the {@code enabled} and returns a reference to this Builder enabling method chaining. + * + * @param enabled the {@code enabled} to set + * @return a reference to this Builder + */ + public Builder enabled(boolean enabled) + { + return update(b -> b.enabled = enabled); + } + + /** + * Sets the {@code initialDelay} and returns a reference to this Builder enabling method chaining. + * + * @param initialDelay the {@code initialDelay} to set + * @return a reference to this Builder + */ + public Builder initialDelay(MillisecondBoundConfiguration initialDelay) + { + return update(b -> b.initialDelay = initialDelay); + } + + /** + * Sets the {@code executeInterval} and returns a reference to this Builder enabling method chaining. + * + * @param executeInterval the {@code executeInterval} to set + * @return a reference to this Builder + */ + public Builder executeInterval(MillisecondBoundConfiguration executeInterval) + { + return update(b -> b.executeInterval = executeInterval); + } + + /** + * Returns a {@code PeriodicTaskConfigurationImpl} built from the parameters previously set. + * + * @return a {@code PeriodicTaskConfigurationImpl} built with parameters of this {@code PeriodicTaskConfigurationImpl.Builder} + */ + @Override + public PeriodicTaskConfigurationImpl build() + { + return new PeriodicTaskConfigurationImpl(this); + } + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SchemaKeyspaceConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SchemaKeyspaceConfigurationImpl.java index c8846e9a..31c0ffaa 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SchemaKeyspaceConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SchemaKeyspaceConfigurationImpl.java @@ -39,7 +39,7 @@ public class SchemaKeyspaceConfigurationImpl implements SchemaKeyspaceConfigurat public static final String DEFAULT_KEYSPACE = "sidecar_internal"; public static final String DEFAULT_REPLICATION_STRATEGY = "SimpleStrategy"; public static final int DEFAULT_REPLICATION_FACTOR = 1; - public static final SecondBoundConfiguration DEFAULT_LEASE_SCHEMA_TTL = SecondBoundConfiguration.parse("2m"); + public static final SecondBoundConfiguration DEFAULT_LEASE_SCHEMA_TTL = SecondBoundConfiguration.parse("5m"); public static final SecondBoundConfiguration MINIMUM_LEASE_SCHEMA_TTL = SecondBoundConfiguration.parse("1m"); @JsonProperty(value = "is_enabled") diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/AbstractTokenZeroOfKeyspaceElectorateMembership.java similarity index 52% copy from server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java copy to server/src/main/java/org/apache/cassandra/sidecar/coordination/AbstractTokenZeroOfKeyspaceElectorateMembership.java index 8a907830..163eaf3c 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/coordination/AbstractTokenZeroOfKeyspaceElectorateMembership.java @@ -20,7 +20,6 @@ package org.apache.cassandra.sidecar.coordination; import java.math.BigInteger; import java.net.InetSocketAddress; -import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -28,43 +27,28 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.Session; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.response.NodeSettings; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; -import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.utils.StringUtils; -import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; /** - * An implementation of {@link ElectorateMembership} where the current Sidecar will - * be determined to be part of the electorate iff one of the Cassandra instances it - * manages owns token {@code 0} for the user keyspace that has the highest replication - * factor. If multiple keyspaces have the highest replication factor, the keyspace - * to be used is decided by the keyspace with the name that sorts first in the - * lexicographic sort order. If no user keyspaces are created, the internal sidecar - * keyspace will be used. + * Provides common functionality for {@link ElectorateMembership} implementations + * that rely on token zero replication of a keyspace to determine eligibility. */ -public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements ElectorateMembership +public abstract class AbstractTokenZeroOfKeyspaceElectorateMembership implements ElectorateMembership { - private static final Logger LOGGER = LoggerFactory.getLogger(MostReplicatedKeyspaceTokenZeroElectorateMembership.class); - private final InstanceMetadataFetcher instanceMetadataFetcher; - private final CQLSessionProvider cqlSessionProvider; - private final SidecarConfiguration configuration; + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTokenZeroOfKeyspaceElectorateMembership.class); + protected final InstanceMetadataFetcher instanceMetadataFetcher; - public MostReplicatedKeyspaceTokenZeroElectorateMembership(InstanceMetadataFetcher instanceMetadataFetcher, - CQLSessionProvider cqlSessionProvider, - SidecarConfiguration sidecarConfiguration) + public AbstractTokenZeroOfKeyspaceElectorateMembership(InstanceMetadataFetcher instanceMetadataFetcher) { this.instanceMetadataFetcher = instanceMetadataFetcher; - this.cqlSessionProvider = cqlSessionProvider; - this.configuration = sidecarConfiguration; } /** @@ -80,24 +64,30 @@ public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements Elec return false; } - String userKeyspace = highestReplicationFactorKeyspace(); - if (userKeyspace == null) + String keyspace = keyspaceToDetermineElectorateMembership(); + if (keyspace == null) { // pre-checks failed return false; } + LOGGER.debug("Using keyspace={} to determine electorate membership", keyspace); TokenRangeReplicasResponse tokenRangeReplicas = instanceMetadataFetcher.callOnFirstAvailableInstance(instance -> { CassandraAdapterDelegate delegate = instance.delegate(); StorageOperations operations = delegate.storageOperations(); NodeSettings nodeSettings = delegate.nodeSettings(); - return operations.tokenRangeReplicas(new Name(userKeyspace), nodeSettings.partitioner()); + return operations.tokenRangeReplicas(new Name(keyspace), nodeSettings.partitioner()); }); return anyInstanceOwnsTokenZero(tokenRangeReplicas, localInstancesHostsAndPorts); } - Set<String> collectLocalInstancesHostsAndPorts() + /** + * @return the name of the keyspace that will be used to determine the electorate membership + */ + protected abstract String keyspaceToDetermineElectorateMembership(); + + protected Set<String> collectLocalInstancesHostsAndPorts() { Set<String> result = new HashSet<>(); for (InstanceMetadata instance : instanceMetadataFetcher.allLocalInstances()) @@ -116,56 +106,13 @@ public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements Elec return result; } - /** - * Performs pre-checks ensuring local instances are configured; an active session to the database is present; - * and returns the keyspace with the highest replication factor. If multiple keyspaces have the highest - * replication factor, the keyspace to be used is decided by the keyspace with the name that sorts first in - * the lexicographic sort order. Defaults to the sidecar keyspace when there are no user keyspaces. - * - * @return user keyspace - */ - String highestReplicationFactorKeyspace() - { - if (instanceMetadataFetcher.allLocalInstances().isEmpty()) - { - LOGGER.warn("There are no local Cassandra instances managed by this Sidecar"); - return null; - } - - Session activeSession; - try - { - activeSession = cqlSessionProvider.get(); - } - catch (CassandraUnavailableException exception) - { - LOGGER.warn("There is no active session to Cassandra"); - return null; - } - - Set<String> forbiddenKeyspaces = configuration.cassandraInputValidationConfiguration().forbiddenKeyspaces(); - String sidecarKeyspaceName = configuration.serviceConfiguration().schemaKeyspaceConfiguration().keyspace(); - - return activeSession.getCluster().getMetadata().getKeyspaces().stream() - .filter(keyspace -> !forbiddenKeyspaces.contains(keyspace.getName())) - // Sort by the keyspace with the highest replication factor - // and then sort by the keyspace name to guarantee in the - // sorting order across all Sidecar instances - .sorted(Comparator.comparingInt(this::aggregateReplicationFactor) - .reversed() - .thenComparing(KeyspaceMetadata::getName)) - .map(KeyspaceMetadata::getName) - .findFirst() - .orElse(sidecarKeyspaceName); - } - /** * @param tokenRangeReplicas the token range replicas for a keyspace * @param localInstancesHostAndPorts local instance(s) IP(s) and port(s) * @return {@code true} if any of the local instances is a replica of token zero for a single keyspace, * {@code false} otherwise */ - boolean anyInstanceOwnsTokenZero(TokenRangeReplicasResponse tokenRangeReplicas, Set<String> localInstancesHostAndPorts) + protected boolean anyInstanceOwnsTokenZero(TokenRangeReplicasResponse tokenRangeReplicas, Set<String> localInstancesHostAndPorts) { return tokenRangeReplicas.readReplicas() .stream() @@ -192,32 +139,11 @@ public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements Elec * @param replicaInfo the replica info * @return {@code true} if the replica info owns token zero, {@code false} otherwise */ - boolean replicaOwnsTokenZero(TokenRangeReplicasResponse.ReplicaInfo replicaInfo) + protected boolean replicaOwnsTokenZero(TokenRangeReplicasResponse.ReplicaInfo replicaInfo) { BigInteger start = new BigInteger(replicaInfo.start()); BigInteger end = new BigInteger(replicaInfo.end()); // start is exclusive; end is inclusive return start.compareTo(BigInteger.ZERO) < 0 && end.compareTo(BigInteger.ZERO) >= 0; } - - /** - * @param keyspace the keyspace - * @return the aggregate replication factor for the {@link KeyspaceMetadata keyspace} - */ - int aggregateReplicationFactor(KeyspaceMetadata keyspace) - { - int replicationFactor = 0; - for (String value : keyspace.getReplication().values()) - { - try - { - replicationFactor += Integer.parseInt(value); - } - catch (NumberFormatException ignored) - { - // skips the class property of the replication factor - } - } - return replicationFactor; - } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTask.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTask.java index 1cc62fb0..49ace97d 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTask.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTask.java @@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.coordination; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.slf4j.Logger; @@ -32,8 +33,9 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; -import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration; +import org.apache.cassandra.sidecar.config.ClusterLeaseClaimConfiguration; import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor; import org.apache.cassandra.sidecar.metrics.CoordinationMetrics; @@ -75,7 +77,7 @@ public class ClusterLeaseClaimTask implements PeriodicTask private final SidecarLeaseDatabaseAccessor accessor; private final ClusterLease clusterLease; private final CoordinationMetrics metrics; - private final PeriodicTaskConfiguration periodicTaskConfiguration; + private final ClusterLeaseClaimConfiguration configuration; private final ServiceConfiguration config; private EventBus eventBus; private String currentLeaseholder; @@ -87,7 +89,7 @@ public class ClusterLeaseClaimTask implements PeriodicTask ClusterLease clusterLease, SidecarMetrics metrics) { - this.periodicTaskConfiguration = serviceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration(); + this.configuration = serviceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration(); this.config = serviceConfiguration; this.electorateMembership = electorateMembership; this.accessor = accessor; @@ -110,7 +112,7 @@ public class ClusterLeaseClaimTask implements PeriodicTask { // The Sidecar schema feature is required for this implementation // so skip when the feature is not enabled - boolean isEnabled = config.schemaKeyspaceConfiguration().isEnabled() && periodicTaskConfiguration.enabled(); + boolean isEnabled = config.schemaKeyspaceConfiguration().isEnabled() && configuration.enabled(); boolean isMember = false; if (isEnabled) { @@ -144,7 +146,16 @@ public class ClusterLeaseClaimTask implements PeriodicTask @Override public DurationSpec initialDelay() { - return periodicTaskConfiguration.initialDelay(); + // Return a randomized delay to introduce jitter among all the instances participating + // in the lease claim process + long randomDeltaDelayMillis = configuration.randomDeltaDelayMillis(); + if (randomDeltaDelayMillis == 0) + { + // no jitter + return configuration.initialDelay(); + } + long initialDelayMillis = configuration.initialDelay().toMillis() + randomDeltaDelayMillis; + return new MillisecondBoundConfiguration(initialDelayMillis, TimeUnit.MILLISECONDS); } /** @@ -153,7 +164,7 @@ public class ClusterLeaseClaimTask implements PeriodicTask @Override public DurationSpec delay() { - DurationSpec delay = periodicTaskConfiguration.executeInterval(); + DurationSpec delay = configuration.executeInterval(); if (delay.compareTo(MINIMUM_DELAY) < 0) { @@ -207,7 +218,7 @@ public class ClusterLeaseClaimTask implements PeriodicTask LOGGER.debug("Attempting to {} lease for sidecarHostId={}", actionName, sidecarHostId); return actionFn.apply(sidecarHostId).currentOwner; } - catch (QueryConsistencyException | NoHostAvailableException e) + catch (QueryConsistencyException | NoHostAvailableException | IllegalArgumentException e) { LOGGER.debug("Unable to {} lease for sidecarHostId={}", actionName, sidecarHostId, e); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java index 8a907830..8d9f2fae 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java @@ -18,11 +18,7 @@ package org.apache.cassandra.sidecar.coordination; -import java.math.BigInteger; -import java.net.InetSocketAddress; import java.util.Comparator; -import java.util.HashSet; -import java.util.List; import java.util.Set; import org.slf4j.Logger; @@ -30,14 +26,7 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.Session; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; -import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; -import org.apache.cassandra.sidecar.common.response.NodeSettings; -import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; -import org.apache.cassandra.sidecar.common.server.StorageOperations; -import org.apache.cassandra.sidecar.common.server.data.Name; -import org.apache.cassandra.sidecar.common.server.utils.StringUtils; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; @@ -51,10 +40,9 @@ import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; * lexicographic sort order. If no user keyspaces are created, the internal sidecar * keyspace will be used. */ -public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements ElectorateMembership +public class MostReplicatedKeyspaceTokenZeroElectorateMembership extends AbstractTokenZeroOfKeyspaceElectorateMembership { private static final Logger LOGGER = LoggerFactory.getLogger(MostReplicatedKeyspaceTokenZeroElectorateMembership.class); - private final InstanceMetadataFetcher instanceMetadataFetcher; private final CQLSessionProvider cqlSessionProvider; private final SidecarConfiguration configuration; @@ -62,58 +50,15 @@ public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements Elec CQLSessionProvider cqlSessionProvider, SidecarConfiguration sidecarConfiguration) { - this.instanceMetadataFetcher = instanceMetadataFetcher; + super(instanceMetadataFetcher); this.cqlSessionProvider = cqlSessionProvider; this.configuration = sidecarConfiguration; } - /** - * {@inheritDoc} - */ @Override - public boolean isMember() + protected String keyspaceToDetermineElectorateMembership() { - Set<String> localInstancesHostsAndPorts = collectLocalInstancesHostsAndPorts(); - if (localInstancesHostsAndPorts.isEmpty()) - { - // Unable to retrieve local instances, maybe all Cassandra connections are down? - return false; - } - - String userKeyspace = highestReplicationFactorKeyspace(); - if (userKeyspace == null) - { - // pre-checks failed - return false; - } - - TokenRangeReplicasResponse tokenRangeReplicas = instanceMetadataFetcher.callOnFirstAvailableInstance(instance -> { - CassandraAdapterDelegate delegate = instance.delegate(); - StorageOperations operations = delegate.storageOperations(); - NodeSettings nodeSettings = delegate.nodeSettings(); - return operations.tokenRangeReplicas(new Name(userKeyspace), nodeSettings.partitioner()); - }); - - return anyInstanceOwnsTokenZero(tokenRangeReplicas, localInstancesHostsAndPorts); - } - - Set<String> collectLocalInstancesHostsAndPorts() - { - Set<String> result = new HashSet<>(); - for (InstanceMetadata instance : instanceMetadataFetcher.allLocalInstances()) - { - try - { - InetSocketAddress address = instance.delegate().localStorageBroadcastAddress(); - result.add(StringUtils.cassandraFormattedHostAndPort(address)); - } - catch (CassandraUnavailableException exception) - { - // Log a warning message and continue - LOGGER.warn("Unable to determine local storage broadcast address for instance. instance={}", instance, exception); - } - } - return result; + return highestReplicationFactorKeyspace(); } /** @@ -159,47 +104,6 @@ public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements Elec .orElse(sidecarKeyspaceName); } - /** - * @param tokenRangeReplicas the token range replicas for a keyspace - * @param localInstancesHostAndPorts local instance(s) IP(s) and port(s) - * @return {@code true} if any of the local instances is a replica of token zero for a single keyspace, - * {@code false} otherwise - */ - boolean anyInstanceOwnsTokenZero(TokenRangeReplicasResponse tokenRangeReplicas, Set<String> localInstancesHostAndPorts) - { - return tokenRangeReplicas.readReplicas() - .stream() - // only returns replicas that contain token zero - .filter(this::replicaOwnsTokenZero) - // and then see if any of the replicas matches the - // local instance's host and port - .anyMatch(replicaInfo -> { - for (List<String> replicas : replicaInfo.replicasByDatacenter().values()) - { - for (String replica : replicas) - { - if (localInstancesHostAndPorts.contains(replica)) - { - return true; - } - } - } - return false; - }); - } - - /** - * @param replicaInfo the replica info - * @return {@code true} if the replica info owns token zero, {@code false} otherwise - */ - boolean replicaOwnsTokenZero(TokenRangeReplicasResponse.ReplicaInfo replicaInfo) - { - BigInteger start = new BigInteger(replicaInfo.start()); - BigInteger end = new BigInteger(replicaInfo.end()); - // start is exclusive; end is inclusive - return start.compareTo(BigInteger.ZERO) < 0 && end.compareTo(BigInteger.ZERO) >= 0; - } - /** * @param keyspace the keyspace * @return the aggregate replication factor for the {@link KeyspaceMetadata keyspace} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarInternalTokenZeroElectorateMembership.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarInternalTokenZeroElectorateMembership.java new file mode 100644 index 00000000..dca2ca0a --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarInternalTokenZeroElectorateMembership.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.coordination; + +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +/** + * An implementation of {@link ElectorateMembership} where the current Sidecar will + * be determined to be part of the electorate iff one of the Cassandra instances it + * manages owns token {@code 0} for the {@code sidecar_internal} keyspace. + */ +public class SidecarInternalTokenZeroElectorateMembership extends AbstractTokenZeroOfKeyspaceElectorateMembership +{ + private final SchemaKeyspaceConfiguration configuration; + + public SidecarInternalTokenZeroElectorateMembership(InstanceMetadataFetcher instanceMetadataFetcher, + SidecarConfiguration sidecarConfiguration) + { + super(instanceMetadataFetcher); + configuration = sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration(); + } + + @Override + protected String keyspaceToDetermineElectorateMembership() + { + return configuration.keyspace(); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/SidecarLeaseDatabaseAccessor.java b/server/src/main/java/org/apache/cassandra/sidecar/db/SidecarLeaseDatabaseAccessor.java index aed5b36c..94ce415e 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/db/SidecarLeaseDatabaseAccessor.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/SidecarLeaseDatabaseAccessor.java @@ -50,6 +50,7 @@ public class SidecarLeaseDatabaseAccessor extends DatabaseAccessor<SidecarLeaseS * * @param leaseClaimer the identifier of the instances attempting to claim the lease * @return the results of performing the lease claim + * @throws IllegalArgumentException when the owner information is not available from the lease claim operation */ public LeaseClaimResult claimLease(String leaseClaimer) { @@ -63,6 +64,7 @@ public class SidecarLeaseDatabaseAccessor extends DatabaseAccessor<SidecarLeaseS * * @param currentOwner the current owner extending the lease * @return the results of performing the lease extension + * @throws IllegalArgumentException when the owner information is not available from the lease extension operation */ public LeaseClaimResult extendLease(String currentOwner) { @@ -76,20 +78,22 @@ public class SidecarLeaseDatabaseAccessor extends DatabaseAccessor<SidecarLeaseS */ public static class LeaseClaimResult { - public final boolean leaseAcquired; public final String currentOwner; - LeaseClaimResult(boolean leaseAcquired, String currentOwner) + LeaseClaimResult(String currentOwner) { - this.leaseAcquired = leaseAcquired; this.currentOwner = currentOwner; } static LeaseClaimResult from(ResultSet resultSet, String newOwner) { return resultSet.wasApplied() - ? new LeaseClaimResult(true, newOwner) - : new LeaseClaimResult(false, resultSet.one().getString("owner")); + ? new LeaseClaimResult(newOwner) + // In some rare cases, the resultSet will not contain the owner information + // even though the resultSet was not applied. This will translate into an + // IllegalArgumentException being thrown when trying to retrieve the non-existing + // owner string. This exception is left to be handled by the caller method + : new LeaseClaimResult(resultSet.one().getString("owner")); } } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CoordinationModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CoordinationModule.java index b1a3b81f..c291af0c 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CoordinationModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CoordinationModule.java @@ -28,7 +28,6 @@ import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.coordination.ClusterLease; import org.apache.cassandra.sidecar.coordination.ClusterLeaseClaimTask; import org.apache.cassandra.sidecar.coordination.ElectorateMembership; -import org.apache.cassandra.sidecar.coordination.MostReplicatedKeyspaceTokenZeroElectorateMembership; import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor; import org.apache.cassandra.sidecar.db.schema.SidecarLeaseSchema; import org.apache.cassandra.sidecar.db.schema.TableSchema; @@ -70,6 +69,6 @@ public class CoordinationModule extends AbstractModule CQLSessionProvider cqlSessionProvider, SidecarConfiguration configuration) { - return new MostReplicatedKeyspaceTokenZeroElectorateMembership(instanceMetadataFetcher, cqlSessionProvider, configuration); + return new ElectorateMembershipFactory().create(instanceMetadataFetcher, cqlSessionProvider, configuration); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/ElectorateMembershipFactory.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/ElectorateMembershipFactory.java new file mode 100644 index 00000000..401e68b3 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/ElectorateMembershipFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.modules; + +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.coordination.ElectorateMembership; +import org.apache.cassandra.sidecar.coordination.MostReplicatedKeyspaceTokenZeroElectorateMembership; +import org.apache.cassandra.sidecar.coordination.SidecarInternalTokenZeroElectorateMembership; +import org.apache.cassandra.sidecar.exceptions.ConfigurationException; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +/** + * Factory class to initialize the {@link org.apache.cassandra.sidecar.coordination.ElectorateMembership} instance + * based on the configured strategy + */ +public class ElectorateMembershipFactory +{ + /** + * Creates the {@link ElectorateMembership} based on the strategy configuration + * + * @param fetcher the interface to retrieve instance metadata + * @param sessionProvider the provider for the CQL session + * @param config the configuration for running Sidecar + * @return the created {@link ElectorateMembership} instance + * @throws ConfigurationException when an invalid strategy is used + */ + public ElectorateMembership create(InstanceMetadataFetcher fetcher, + CQLSessionProvider sessionProvider, + SidecarConfiguration config) + { + String strategy = config.serviceConfiguration() + .coordinationConfiguration() + .clusterLeaseClaimConfiguration() + .electorateMembershipStrategy(); + switch (strategy) + { + case "MostReplicatedKeyspaceTokenZeroElectorateMembership": + return new MostReplicatedKeyspaceTokenZeroElectorateMembership(fetcher, sessionProvider, config); + case "SidecarInternalTokenZeroElectorateMembership": + return new SidecarInternalTokenZeroElectorateMembership(fetcher, config); + default: + throw new ConfigurationException("Invalid electorate membership strategy value '" + strategy + "'"); + } + } +} diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java index b6a4c151..82759a45 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.sidecar.coordination; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -79,6 +80,7 @@ class MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest Vertx vertx = Vertx.vertx(); DriverUtils driverUtils = new DriverUtils(); CassandraVersionProvider cassandraVersionProvider = cassandraVersionProvider(DnsResolver.DEFAULT); + MetricRegistryFactory metricRegistryProvider = new MetricRegistryFactory("cassandra_sidecar", List.of(), List.of()); @ParameterizedTest(name = "{index} => version {0}") @MethodSource("org.apache.cassandra.testing.TestVersionSupplier#testVersions") @@ -107,12 +109,18 @@ class MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest private void runTestScenario(AbstractCluster<?> cluster) { - List<MostReplicatedKeyspaceTokenZeroElectorateMembership> memberships = buildElectorateMembershipPerCassandraInstance(cluster); + AbstractMap.SimpleEntry<List<? extends ElectorateMembership>, List<? extends ElectorateMembership>> pair + = buildElectorateMembershipPerCassandraInstance(cluster); + List<? extends ElectorateMembership> mostReplicatedMemberships = pair.getKey(); + List<? extends ElectorateMembership> sidecarInternalMemberships = pair.getValue(); // When there are no user keyspaces, we default to the sidecar_internal keyspace // and therefore guaranteeing that we have at least one keyspace to use for the // determination of the membership, and that's why we expect the membership count // to be one, even if we have not created user keyspaces yet. - assertMembership(memberships, 1); + assertMembership(mostReplicatedMemberships, 1); + // For the sidecar internal keyspace based membership, the membership count will + // always be the same. + assertMembership(sidecarInternalMemberships, 1); // Now let's create keyspaces with RF 1-3 replicated in a single DC and validate String dc0 = "dc0"; @@ -121,7 +129,10 @@ class MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest cluster.schemaChange(String.format("CREATE KEYSPACE ks_dc0_%d WITH REPLICATION={'class':'NetworkTopologyStrategy','%s':%d}", rf, dc0, rf)); // introduce delay until schema change information propagates sleepUninterruptibly(10, TimeUnit.SECONDS); - assertMembership(memberships, rf); + assertMembership(mostReplicatedMemberships, rf); + // For the sidecar internal keyspace based membership, the membership count will + // always be the same. + assertMembership(sidecarInternalMemberships, 1); } // Now let's create keyspaces with RF 1-4 replicated in DC2 and validate @@ -133,7 +144,10 @@ class MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest cluster.schemaChange(String.format("CREATE KEYSPACE ks_dc1_%d WITH REPLICATION={'class':'NetworkTopologyStrategy','%s':%d}", rf, dc1, rf)); // introduce delay until schema change information propagates sleepUninterruptibly(10, TimeUnit.SECONDS); - assertMembership(memberships, Math.max(3, rf)); + assertMembership(mostReplicatedMemberships, Math.max(3, rf)); + // For the sidecar internal keyspace based membership, the membership count will + // always be the same. + assertMembership(sidecarInternalMemberships, 1); } // Now let's create a keyspace with RF=3 replicated across both DCs @@ -142,13 +156,16 @@ class MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest sleepUninterruptibly(10, TimeUnit.SECONDS); // We expect the same instances in the existing keyspaces to own token 0 as the new keyspace // so a total of 6 instances own token 0, 3 on each DC. - assertMembership(memberships, 6); + assertMembership(mostReplicatedMemberships, 6); + // For the sidecar internal keyspace based membership, the membership count will + // always be the same. + assertMembership(sidecarInternalMemberships, 1); } - static void assertMembership(List<MostReplicatedKeyspaceTokenZeroElectorateMembership> memberships, int expectedElectorateSize) + static void assertMembership(List<? extends ElectorateMembership> memberships, int expectedElectorateSize) { int localElectorateCount = 0; - for (MostReplicatedKeyspaceTokenZeroElectorateMembership membership : memberships) + for (ElectorateMembership membership : memberships) { boolean shouldParticipate = membership.isMember(); if (shouldParticipate) @@ -160,13 +177,13 @@ class MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest .isEqualTo(expectedElectorateSize); } - List<MostReplicatedKeyspaceTokenZeroElectorateMembership> buildElectorateMembershipPerCassandraInstance(AbstractCluster<?> cluster) + AbstractMap.SimpleEntry<List<? extends ElectorateMembership>, List<? extends ElectorateMembership>> + buildElectorateMembershipPerCassandraInstance(AbstractCluster<?> cluster) { - MetricRegistryFactory metricRegistryProvider = new MetricRegistryFactory("cassandra_sidecar", - Collections.emptyList(), - Collections.emptyList()); - - List<MostReplicatedKeyspaceTokenZeroElectorateMembership> result = new ArrayList<>(); + List<MostReplicatedKeyspaceTokenZeroElectorateMembership> r1 = new ArrayList<>(); + List<SidecarInternalTokenZeroElectorateMembership> r2 = new ArrayList<>(); + AbstractMap.SimpleEntry<List<? extends ElectorateMembership>, List<? extends ElectorateMembership>> result + = new AbstractMap.SimpleEntry<>(r1, r2); for (IInstance instance : cluster) { List<InetSocketAddress> address = buildContactList(instance); @@ -174,7 +191,8 @@ class MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest new CQLSessionProviderImpl(address, address, 500, instance.config().localDatacenter(), 0, SharedExecutorNettyOptions.INSTANCE); InstancesMetadata instancesMetadata = buildInstancesMetadata(instance, sessionProvider, metricRegistryProvider); InstanceMetadataFetcher instanceMetadataFetcher = new InstanceMetadataFetcher(instancesMetadata); - result.add(new MostReplicatedKeyspaceTokenZeroElectorateMembership(instanceMetadataFetcher, sessionProvider, CONFIG)); + r1.add(new MostReplicatedKeyspaceTokenZeroElectorateMembership(instanceMetadataFetcher, sessionProvider, CONFIG)); + r2.add(new SidecarInternalTokenZeroElectorateMembership(instanceMetadataFetcher, CONFIG)); } return result; } diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java index 2f0560ab..16a0438e 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java @@ -45,6 +45,7 @@ import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.config.SslConfiguration; import org.apache.cassandra.sidecar.config.yaml.AccessControlConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.CacheConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.ClusterLeaseClaimConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.CoordinationConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.ParameterizedClassConfigurationImpl; @@ -204,9 +205,13 @@ public class IntegrationTestModule extends AbstractModule @Singleton public CoordinationConfiguration clusterLeaseClaimTaskConfiguration() { - return new CoordinationConfigurationImpl(new PeriodicTaskConfigurationImpl(true, - MillisecondBoundConfiguration.parse("1s"), - MillisecondBoundConfiguration.parse("1s"))); + ClusterLeaseClaimConfigurationImpl configuration + = ClusterLeaseClaimConfigurationImpl.builder() + .overridePeriodicTaskConfiguration(b -> b.enabled(true) + .initialDelay(MillisecondBoundConfiguration.parse("1s")) + .executeInterval(MillisecondBoundConfiguration.parse("1s"))) + .build(); + return new CoordinationConfigurationImpl(configuration); } @Provides diff --git a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java index 2eb1ffdf..cf75bb7d 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java @@ -228,7 +228,7 @@ class SidecarConfigurationTest assertThat(configuration.replicationFactor()).isEqualTo(3); assertThat(configuration.createReplicationStrategyString()) .isEqualTo("{'class':'SimpleStrategy', 'replication_factor':'3'}"); - assertThat(configuration.leaseSchemaTTL().toSeconds()).isEqualTo(120); + assertThat(configuration.leaseSchemaTTL().toSeconds()).isEqualTo(300L); } @Test @@ -427,17 +427,49 @@ class SidecarConfigurationTest @Test void testCoordinationConfiguration() throws Exception { - Path yamlPath = yaml("config/sidecar_coordination.yaml"); - SidecarConfiguration config = SidecarConfigurationImpl.readYamlConfiguration(yamlPath); + String yaml = "sidecar:\n" + + " coordination:\n" + + " cluster_lease_claim:\n" + + " electorate_membership_strategy: SidecarInternalTokenZeroElectorateMembership\n" + + " enabled: false\n" + + " initial_delay: 5s\n" + + " execute_interval: 31s\n" + + " initial_delay_random_delta: 10s"; + SidecarConfiguration config = SidecarConfigurationImpl.fromYamlString(yaml); + ServiceConfiguration serviceConfiguration = config.serviceConfiguration(); + assertThat(serviceConfiguration).isNotNull(); + + CoordinationConfiguration coordinationConfiguration = serviceConfiguration.coordinationConfiguration(); + assertThat(coordinationConfiguration).isNotNull(); + ClusterLeaseClaimConfiguration clusterLeaseConfig = coordinationConfiguration.clusterLeaseClaimConfiguration(); + assertThat(clusterLeaseConfig.enabled()).isFalse(); + assertThat(clusterLeaseConfig.initialDelay().toMillis()).isEqualTo(5_000L); + assertThat(clusterLeaseConfig.executeInterval().toMillis()).isEqualTo(31_000L); + assertThat(clusterLeaseConfig.initialDelayRandomDelta().toMillis()).isEqualTo(10_000L); + assertThat(clusterLeaseConfig.randomDeltaDelayMillis()).isBetween(0L, 10_000L); + assertThat(clusterLeaseConfig.electorateMembershipStrategy()).isEqualTo("SidecarInternalTokenZeroElectorateMembership"); + } + + @Test + void testCoordinationDefaultElectorateMembershipStrategy() throws Exception + { + String yaml = "sidecar:\n" + + " coordination:\n" + + " cluster_lease_claim:\n" + + " enabled: false\n" + + " initial_delay: 5s\n" + + " execute_interval: 31s"; + SidecarConfiguration config = SidecarConfigurationImpl.fromYamlString(yaml); ServiceConfiguration serviceConfiguration = config.serviceConfiguration(); assertThat(serviceConfiguration).isNotNull(); CoordinationConfiguration coordinationConfiguration = serviceConfiguration.coordinationConfiguration(); assertThat(coordinationConfiguration).isNotNull(); - PeriodicTaskConfiguration periodicTaskConfig = coordinationConfiguration.clusterLeaseClaimConfiguration(); - assertThat(periodicTaskConfig.enabled()).isFalse(); - assertThat(periodicTaskConfig.initialDelay().toMillis()).isEqualTo(5_000L); - assertThat(periodicTaskConfig.executeInterval().toMillis()).isEqualTo(31_000L); + ClusterLeaseClaimConfiguration clusterLeaseConfig = coordinationConfiguration.clusterLeaseClaimConfiguration(); + assertThat(clusterLeaseConfig.enabled()).isFalse(); + assertThat(clusterLeaseConfig.initialDelay().toMillis()).isEqualTo(5_000L); + assertThat(clusterLeaseConfig.executeInterval().toMillis()).isEqualTo(31_000L); + assertThat(clusterLeaseConfig.electorateMembershipStrategy()).isEqualTo("MostReplicatedKeyspaceTokenZeroElectorateMembership"); } void validateSingleInstanceSidecarConfiguration(SidecarConfiguration config) @@ -622,10 +654,13 @@ class SidecarConfigurationTest // Validate default configuration CoordinationConfiguration coordinationConfiguration = serviceConfiguration.coordinationConfiguration(); assertThat(coordinationConfiguration).isNotNull(); - PeriodicTaskConfiguration periodicTaskConfig = coordinationConfiguration.clusterLeaseClaimConfiguration(); - assertThat(periodicTaskConfig.enabled()).isTrue(); - assertThat(periodicTaskConfig.executeInterval().toMillis()).isEqualTo(60_000L); - assertThat(periodicTaskConfig.initialDelay().toMillis()).isEqualTo(1_000L); + ClusterLeaseClaimConfiguration clusterLeaseConfig = coordinationConfiguration.clusterLeaseClaimConfiguration(); + assertThat(clusterLeaseConfig.enabled()).isTrue(); + assertThat(clusterLeaseConfig.executeInterval().toMillis()).isEqualTo(100_000L); + assertThat(clusterLeaseConfig.initialDelay().toMillis()).isEqualTo(1_000L); + assertThat(clusterLeaseConfig.initialDelayRandomDelta().toMillis()).isEqualTo(30_000L); + assertThat(clusterLeaseConfig.randomDeltaDelayMillis()).isBetween(0L, 30_000L); + assertThat(clusterLeaseConfig.electorateMembershipStrategy()).isEqualTo("MostReplicatedKeyspaceTokenZeroElectorateMembership"); } private void validateHealthCheckConfigurationFromYaml(PeriodicTaskConfiguration config) diff --git a/server/src/test/java/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskTest.java b/server/src/test/java/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskTest.java index 1ea1888f..78c745d5 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskTest.java @@ -112,20 +112,26 @@ class ClusterLeaseClaimTaskTest @ParameterizedTest(name = "{index} => configuredInitialDelay {0} millis") @ValueSource(longs = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }) - void testInitialDelayFromConfiguration(long configuredDelayMillis) + void testRandomizedInitialDelayFromConfiguration(long configuredDelayMillis) { ServiceConfiguration mockServiceConfiguration = mock(ServiceConfiguration.class, RETURNS_DEEP_STUBS); when(mockServiceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().initialDelay().quantity()) .thenReturn(configuredDelayMillis); when(mockServiceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().initialDelay().unit()) .thenReturn(TimeUnit.MILLISECONDS); + when(mockServiceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().initialDelay().toMillis()) + .thenCallRealMethod(); when(mockServiceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().initialDelay().to(TimeUnit.MILLISECONDS)) .thenCallRealMethod(); + when(mockServiceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().randomDeltaDelayMillis()) + .thenCallRealMethod(); + when(mockServiceConfiguration.coordinationConfiguration().clusterLeaseClaimConfiguration().initialDelayRandomDelta().toMillis()) + .thenReturn(30_000L); ClusterLeaseClaimTask task = new ClusterLeaseClaimTask(mockServiceConfiguration, mock(ElectorateMembership.class), mock(SidecarLeaseDatabaseAccessor.class), new ClusterLease(), mock(SidecarMetrics.class, RETURNS_DEEP_STUBS)); - assertThat(task.initialDelay().to(TimeUnit.MILLISECONDS)).isEqualTo(configuredDelayMillis); + assertThat(task.initialDelay().to(TimeUnit.MILLISECONDS)).isBetween(configuredDelayMillis, configuredDelayMillis + 30_000L); } @ParameterizedTest(name = "{index} => configuredDelayMillis {0} millis") diff --git a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java index 9b6db6f5..e0271b77 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java @@ -184,9 +184,9 @@ public class SidecarSchemaTest "WHERE job_id = ? AND bucket_id = ? AND start_token = ? AND end_token = ?", "INSERT INTO sidecar_internal.sidecar_lease_v1 (name,owner) " + - "VALUES ('cluster_lease_holder',?) IF NOT EXISTS USING TTL 120", + "VALUES ('cluster_lease_holder',?) IF NOT EXISTS USING TTL 300", - "UPDATE sidecar_internal.sidecar_lease_v1 USING TTL 120 SET owner = ? " + + "UPDATE sidecar_internal.sidecar_lease_v1 USING TTL 300 SET owner = ? " + "WHERE name = 'cluster_lease_holder' IF owner = ?", "SELECT * FROM sidecar_internal.role_permissions_v1", diff --git a/server/src/test/java/org/apache/cassandra/sidecar/modules/ElectorateMembershipFactoryTest.java b/server/src/test/java/org/apache/cassandra/sidecar/modules/ElectorateMembershipFactoryTest.java new file mode 100644 index 00000000..46c5cfb5 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/modules/ElectorateMembershipFactoryTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.modules; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.coordination.ElectorateMembership; +import org.apache.cassandra.sidecar.coordination.MostReplicatedKeyspaceTokenZeroElectorateMembership; +import org.apache.cassandra.sidecar.coordination.SidecarInternalTokenZeroElectorateMembership; +import org.apache.cassandra.sidecar.exceptions.ConfigurationException; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link ElectorateMembershipFactory} class + */ +class ElectorateMembershipFactoryTest +{ + private ElectorateMembershipFactory factory; + + @BeforeEach + void setup() + { + factory = new ElectorateMembershipFactory(); + } + + @ParameterizedTest(name = "{index} => strategy=\"{0}\"") + @ValueSource(strings = { "invalid", "", "a" }) + void testInvalidStrategy(String strategy) + { + SidecarConfiguration config = mock(SidecarConfiguration.class, RETURNS_DEEP_STUBS); + when(config.serviceConfiguration() + .coordinationConfiguration() + .clusterLeaseClaimConfiguration() + .electorateMembershipStrategy()).thenReturn(strategy); + + assertThatExceptionOfType(ConfigurationException.class) + .isThrownBy(() -> factory.create(mock(InstanceMetadataFetcher.class), mock(CQLSessionProvider.class), config)) + .withMessage("Invalid electorate membership strategy value '" + strategy + "'"); + } + + @Test + void testMostReplicatedKeyspaceTokenZeroElectorateMembership() + { + SidecarConfiguration config = mock(SidecarConfiguration.class, RETURNS_DEEP_STUBS); + when(config.serviceConfiguration() + .coordinationConfiguration() + .clusterLeaseClaimConfiguration() + .electorateMembershipStrategy()).thenReturn(MostReplicatedKeyspaceTokenZeroElectorateMembership.class.getSimpleName()); + + ElectorateMembership electorateMembership = factory.create(mock(InstanceMetadataFetcher.class), mock(CQLSessionProvider.class), config); + assertThat(electorateMembership).isInstanceOf(MostReplicatedKeyspaceTokenZeroElectorateMembership.class); + } + + @Test + void testSidecarInternalTokenZeroElectorateMembership() + { + SidecarConfiguration config = mock(SidecarConfiguration.class, RETURNS_DEEP_STUBS); + when(config.serviceConfiguration() + .coordinationConfiguration() + .clusterLeaseClaimConfiguration() + .electorateMembershipStrategy()).thenReturn(SidecarInternalTokenZeroElectorateMembership.class.getSimpleName()); + + ElectorateMembership electorateMembership = factory.create(mock(InstanceMetadataFetcher.class), mock(CQLSessionProvider.class), config); + assertThat(electorateMembership).isInstanceOf(SidecarInternalTokenZeroElectorateMembership.class); + } +} diff --git a/server/src/test/resources/config/sidecar_coordination.yaml b/server/src/test/resources/config/sidecar_coordination.yaml deleted file mode 100644 index 59cae9dc..00000000 --- a/server/src/test/resources/config/sidecar_coordination.yaml +++ /dev/null @@ -1,247 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# -# Cassandra SideCar configuration file -# -cassandra_instances: - - id: 1 - host: localhost1 - port: 9042 - storage_dir: /ccm/test/node1 - data_dirs: - - ~/.ccm/test/node1/data0 - staging_dir: ~/.ccm/test/node1/sstable-staging - jmx_host: 127.0.0.1 - jmx_port: 7100 - jmx_ssl_enabled: false -# jmx_role: -# jmx_role_password: - - id: 2 - host: localhost2 - port: 9042 - storage_dir: /ccm/test/node2 - data_dirs: - - ~/.ccm/test/node2/data0 - staging_dir: ~/.ccm/test/node2/sstable-staging - jmx_host: 127.0.0.1 - jmx_port: 7200 - jmx_ssl_enabled: false -# jmx_role: -# jmx_role_password: - - id: 3 - host: localhost3 - port: 9042 - storage_dir: /ccm/test/node3 - data_dirs: - - ~/.ccm/test/node3/data0 - staging_dir: ~/.ccm/test/node3/sstable-staging - jmx_host: 127.0.0.1 - jmx_port: 7300 - jmx_ssl_enabled: false -# jmx_role: -# jmx_role_password: - -sidecar: - host: 0.0.0.0 - port: 9043 - request_idle_timeout: 5m - request_timeout: 5m - tcp_keep_alive: false - accept_backlog: 1024 - server_verticle_instances: 1 - throttle: - stream_requests_per_sec: 5000 - timeout: 10s - traffic_shaping: - inbound_global_bandwidth_bps: 0 # 0 implies unthrottled, the inbound bandwidth in bytes per second - outbound_global_bandwidth_bps: 0 # 0 implies unthrottled, the outbound bandwidth in bytes per second - peak_outbound_global_bandwidth_bps: 419430400 # the peak outbound bandwidth in bytes per second. The default is 400 mebibytes per second - max_delay_to_wait: 15s # 15 seconds - check_interval_for_stats: 1s # 1 second - inbound_global_file_bandwidth_bps: 0 # 0 implies unthrottled, the inbound bandwidth allocated for incoming files in bytes per second, upper-bounded by inbound_global_bandwidth_bps - sstable_upload: - concurrent_upload_limit: 80 - min_free_space_percent: 10 - # file_permissions: "rw-r--r--" # when not specified, the default file permissions are owner read & write, group & others read - allowable_time_skew: 1h - sstable_import: - execute_interval: 100ms - cache: - expire_after_access: 2h # 2 hours - maximum_size: 10000 - sstable_snapshot: - snapshot_list_cache: - expire_after_access: 2h # 2 hours - maximum_size: 10000 - worker_pools: - service: - name: "sidecar-worker-pool" - size: 20 - max_execution_time: 1m # 1 minute - internal: - name: "sidecar-internal-worker-pool" - size: 20 - max_execution_time: 15m # 15 minutes - jmx: - max_retries: 3 - retry_delay: 200ms - schema: - is_enabled: false - keyspace: sidecar_internal - replication_strategy: SimpleStrategy - replication_factor: 1 - lease_schema_ttl_sec: 120 - coordination: - cluster_lease_claim: - enabled: false - initial_delay: 5s - execute_interval: 31s - -vertx: - filesystem_options: - classpath_resolving_enabled: false - file_caching_enabled: false - -# -# Enable SSL configuration (Disabled by default) -# -# ssl: -# enabled: true -# use_openssl: true -# handshake_timeout: 10s -# client_auth: NONE # valid options are NONE, REQUEST, REQUIRED -# accepted_protocols: -# - TLSv1.2 -# - TLSv1.3 -# cipher_suites: [] -# keystore: -# type: PKCS12 -# path: "path/to/keystore.p12" -# password: password -# check_interval: 5m -# truststore: -# path: "path/to/truststore.p12" -# password: password - -access_control: - # When enabled requests need to be authenticated and authorized before servicing. - enabled: false - # Supports setting multiple authenticators, request is authenticated if it is authenticated by any of the - # configured authenticators - authenticators: - - class_name: org.apache.cassandra.sidecar.acl.authentication.MutualTlsAuthenticationHandlerFactory - parameters: - # Certificate validator is used to validate details within a certificate, such as issuer organization, - # issuer country, CNs, certificate expiry etc. - # - # io.vertx.ext.auth.mtls.impl.AllowAllCertificateValidator performs no checks, it marks all certificates as valid. - # other options are, io.vertx.ext.auth.mtls.impl.CertificateValidatorImpl - certificate_validator: io.vertx.ext.auth.mtls.impl.AllowAllCertificateValidator - # CertificateIdentityExtractor is used to extract valid identities from certificates. These identities will be - # used for authorizing users. - # - # org.apache.cassandra.sidecar.acl.authentication.CassandraIdentityExtractor verifies that identity - # extracted from certificate is present in identity_to_role table. Identities in identity_to_role table in - # Cassandra are authenticated identities in Cassandra. - # - # other options are, io.vertx.ext.auth.mtls.impl.SpiffeIdentityExtractor. - certificate_identity_extractor: org.apache.cassandra.sidecar.acl.authentication.CassandraIdentityExtractor - # Identities that are authenticated and authorized. - admin_identities: -# - spiffe://authorized/admin/identities - permission_cache: - enabled: true - expire_after_access: 5m - maximum_size: 1000 - warmup_retries: 5 - warmup_retry_interval: 2s - -driver_parameters: - contact_points: - - "127.0.0.1:9042" - - "127.0.0.2:9042" - username: cassandra - password: cassandra - ssl: - enabled: false - keystore: - type: PKCS12 - path: path/to/keystore.p12 - password: password - truststore: - type: PKCS12 - path: path/to/keystore.p12 - password: password - num_connections: 6 -# local_dc: datacenter1 - -healthcheck: - initial_delay_millis: 0 - poll_freq_millis: 30000 - -metrics: - registry_name: cassandra_sidecar - vertx: - enabled: true - expose_via_jmx: false - jmx_domain_name: sidecar.vertx.jmx_domain - include: # empty include list means include all - - type: "regex" # possible filter types are "regex" and "equals" - value: "Sidecar.*" - - type: "regex" - value: "vertx.*" - exclude: # empty exclude list means exclude nothing -# - type: "regex" # possible filter types are "regex" and "equals" -# value: "vertx.eventbus.*" # exclude all metrics starts with vertx.eventbus - -cassandra_input_validation: - forbidden_keyspaces: - - system_schema - - system_traces - - system_distributed - - system - - system_auth - - system_views - - system_virtual_schema - - sidecar_internal - allowed_chars_for_directory: "[a-zA-Z][a-zA-Z0-9_]{0,47}" - allowed_chars_for_quoted_name: "[a-zA-Z_0-9]{1,48}" - allowed_chars_for_component_name: "[a-zA-Z0-9_-]+(.db|.cql|.json|.crc32|TOC.txt)" - allowed_chars_for_restricted_component_name: "[a-zA-Z0-9_-]+(.db|TOC.txt)" - -blob_restore: - job_discovery_active_loop_delay: 5m - job_discovery_idle_loop_delay: 10m - job_discovery_recency_days: 5 - slice_process_max_concurrency: 20 - restore_job_tables_ttl: 90d - slow_task_threshold: 10m - slow_task_report_delay: 1m - ring_topology_refresh_delay: 1m - -s3_client: - concurrency: 4 - thread_name_prefix: s3-client - thread_keep_alive: 1m - api_call_timeout: 1m # 1 minute - range_get_object_bytes_size: 5242880 # 5 MiB -# proxy_config: -# uri: -# username: -# password: diff --git a/server/src/test/resources/config/sidecar_multiple_instances.yaml b/server/src/test/resources/config/sidecar_multiple_instances.yaml index ff39ee56..88b17b8e 100644 --- a/server/src/test/resources/config/sidecar_multiple_instances.yaml +++ b/server/src/test/resources/config/sidecar_multiple_instances.yaml @@ -115,11 +115,19 @@ sidecar: jmx: max_retries: 1 retry_delay: 1234ms + schema: + is_enabled: false + keyspace: sidecar_internal + replication_strategy: SimpleStrategy + replication_factor: 1 + lease_schema_ttl: 5m coordination: cluster_lease_claim: + electorate_membership_strategy: MostReplicatedKeyspaceTokenZeroElectorateMembership enabled: true initial_delay: 1s - execute_interval: 60s + initial_delay_random_delta: 30s + execute_interval: 100s vertx: filesystem_options: diff --git a/server/src/test/resources/config/sidecar_single_instance.yaml b/server/src/test/resources/config/sidecar_single_instance.yaml index a249ffcf..84b0a009 100644 --- a/server/src/test/resources/config/sidecar_single_instance.yaml +++ b/server/src/test/resources/config/sidecar_single_instance.yaml @@ -63,11 +63,14 @@ sidecar: keyspace: sidecar_internal replication_strategy: SimpleStrategy replication_factor: 1 + lease_schema_ttl: 5m coordination: cluster_lease_claim: + electorate_membership_strategy: MostReplicatedKeyspaceTokenZeroElectorateMembership enabled: true initial_delay: 1s - execute_interval: 60s + initial_delay_random_delta: 30s + execute_interval: 100s # # Enable SSL configuration (Disabled by default) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org