This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new a424d836 CASSSIDECAR-182: Refactor access to delegate methods to simplify (#168) a424d836 is described below commit a424d836811508078f0761fd4650c31e330e1886 Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Mon Dec 30 15:28:42 2024 -0800 CASSSIDECAR-182: Refactor access to delegate methods to simplify (#168) Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSSIDECAR-182 --- CHANGES.txt | 1 + adapters/base/build.gradle | 1 + .../sidecar/adapters/base/CassandraAdapter.java | 96 +++++++++------------- .../base/GossipDependentStorageJmxOperations.java | 2 +- .../exception/OperationUnavailableException.java | 3 - .../adapters/cassandra41/Cassandra41Adapter.java | 2 + .../sidecar/common/server/CQLSessionProvider.java | 14 ++-- .../sidecar/common/server/ICassandraAdapter.java | 47 +++++++---- .../cassandra/sidecar/db/DatabaseAccessor.java | 16 +--- .../exceptions/CassandraUnavailableException.java | 59 +++++++++++++ .../sidecar/cluster/CQLSessionProviderImpl.java | 23 ++++-- .../sidecar/cluster/CassandraAdapterDelegate.java | 93 +++++++++++---------- .../sidecar/cluster/instance/InstanceMetadata.java | 22 +---- .../cluster/instance/InstanceMetadataImpl.java | 22 ++++- .../cluster/locator/CachedLocalTokenRanges.java | 11 ++- ...catedKeyspaceTokenZeroElectorateMembership.java | 38 +++++---- .../apache/cassandra/sidecar/db/RestoreRange.java | 18 ++-- .../cassandra/sidecar/db/schema/SidecarSchema.java | 13 ++- .../sidecar/metrics/SidecarMetricsImpl.java | 19 ++--- .../sidecar/restore/RestoreJobProgressTracker.java | 12 +-- .../sidecar/restore/RingTopologyRefresher.java | 15 +++- .../cassandra/sidecar/routes/AbstractHandler.java | 23 ------ .../routes/ConnectedClientStatsHandler.java | 23 +++--- .../cassandra/sidecar/routes/RingHandler.java | 20 +---- .../cassandra/sidecar/routes/SchemaHandler.java | 19 +---- .../routes/StreamSSTableComponentHandler.java | 22 +---- .../routes/TokenRangeReplicaMapHandler.java | 19 +---- .../routes/cassandra/NodeSettingsHandler.java | 20 +---- .../routes/snapshots/ClearSnapshotHandler.java | 10 +-- .../routes/snapshots/CreateSnapshotHandler.java | 38 ++++----- .../routes/snapshots/ListSnapshotHandler.java | 21 +---- .../sstableuploads/SSTableImportHandler.java | 25 ++---- .../sstableuploads/SSTableUploadHandler.java | 28 ++----- .../validations/ValidateTableExistenceHandler.java | 13 +-- .../cassandra/sidecar/server/MainModule.java | 3 +- .../cassandra/sidecar/utils/HttpExceptions.java | 20 ++--- .../sidecar/utils/InstanceMetadataFetcher.java | 24 ++++-- .../cassandra/sidecar/utils/SSTableImporter.java | 88 ++++++++------------ .../ClusterLeaseClaimTaskIntegrationTest.java | 17 +++- .../sidecar/testing/IntegrationTestModule.java | 6 +- .../sidecar/TestCassandraAdapterDelegate.java | 33 ++++++-- ...dKeyspaceTokenZeroElectorateMembershipTest.java | 4 +- .../sidecar/metrics/InstanceHealthMetricsTest.java | 3 + .../sidecar/restore/RestoreRangeTaskTest.java | 5 +- .../sidecar/utils/SSTableImporterTest.java | 10 ++- 45 files changed, 460 insertions(+), 561 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9b47f892..66546f23 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Refactor access to delegate methods to simplify (CASSSIDECAR-182) * Renaming InstancesConfig to InstancesMetadata (CASSSIDECAR-175) * Mechanism to have a reduced number of Sidecar instances run operations (CASSSIDECAR-174) * Adding support for CDC APIs into sidecar client (CASSSIDECAR-172) diff --git a/adapters/base/build.gradle b/adapters/base/build.gradle index db06faa2..37becd25 100644 --- a/adapters/base/build.gradle +++ b/adapters/base/build.gradle @@ -26,6 +26,7 @@ plugins { id 'java-library' id 'idea' id 'maven-publish' + id 'jacoco' id "com.github.spotbugs" } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java index 55a7b99b..f9a1f5c3 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java @@ -20,9 +20,6 @@ package org.apache.cassandra.sidecar.adapters.base; import java.net.InetSocketAddress; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; @@ -39,14 +36,16 @@ import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.TableOperations; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.common.server.utils.DriverUtils; -import org.jetbrains.annotations.Nullable; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL; /** * A {@link ICassandraAdapter} implementation for Cassandra 4.0 and later */ public class CassandraAdapter implements ICassandraAdapter { - private static final Logger LOGGER = LoggerFactory.getLogger(CassandraAdapter.class); protected final DnsResolver dnsResolver; protected final JmxClient jmxClient; protected final CQLSessionProvider cqlSessionProvider; @@ -71,93 +70,46 @@ public class CassandraAdapter implements ICassandraAdapter * {@inheritDoc} */ @Override - @Nullable - public Metadata metadata() + @NotNull + public Metadata metadata() throws CassandraUnavailableException { - Session activeSession = cqlSessionProvider.get(); - if (activeSession == null) - { - LOGGER.warn("There is no active session to Cassandra"); - return null; - } - - if (activeSession.getCluster() == null) - { - LOGGER.warn("There is no available cluster for session={}", activeSession); - return null; - } - - if (activeSession.getCluster().getMetadata() == null) - { - LOGGER.warn("There is no available metadata for session={}, cluster={}", - activeSession, activeSession.getCluster()); - } - - return activeSession.getCluster().getMetadata(); + return cqlSessionProvider.get().getCluster().getMetadata(); } /** * {@inheritDoc} */ @Override - @Nullable + @NotNull public NodeSettings nodeSettings() { throw new UnsupportedOperationException("Node settings are not provided by this adapter"); } @Override + @NotNull public ResultSet executeLocal(Statement statement) { Session activeSession = cqlSessionProvider.get(); Metadata metadata = metadata(); - // Both of the above log about lack of session/metadata, so no need to log again - if (activeSession == null || metadata == null) - { - return null; - } - Host host = getHost(metadata); - if (host == null) - { - LOGGER.debug("Could not find host in metadata for address {}", localNativeTransportAddress); - return null; - } statement.setConsistencyLevel(ConsistencyLevel.ONE); statement.setHost(host); return activeSession.execute(statement); } - protected Host getHost(Metadata metadata) - { - if (host == null) - { - synchronized (this) - { - if (host == null) - { - host = driverUtils.getHost(metadata, localNativeTransportAddress); - } - } - } - return host; - } - @Override + @NotNull public InetSocketAddress localNativeTransportAddress() { return localNativeTransportAddress; } @Override + @NotNull public InetSocketAddress localStorageBroadcastAddress() { Metadata metadata = metadata(); - if (metadata == null) - { - return null; - } - return getHost(metadata).getBroadcastSocketAddress(); } @@ -165,12 +117,14 @@ public class CassandraAdapter implements ICassandraAdapter * {@inheritDoc} */ @Override + @NotNull public StorageOperations storageOperations() { return new CassandraStorageOperations(jmxClient, dnsResolver); } @Override + @NotNull public MetricsOperations metricsOperations() { return new CassandraMetricsOperations(cqlSessionProvider); @@ -180,6 +134,7 @@ public class CassandraAdapter implements ICassandraAdapter * {@inheritDoc} */ @Override + @NotNull public ClusterMembershipOperations clusterMembershipOperations() { return new CassandraClusterMembershipOperations(jmxClient); @@ -189,6 +144,7 @@ public class CassandraAdapter implements ICassandraAdapter * {@inheritDoc} */ @Override + @NotNull public TableOperations tableOperations() { return new CassandraTableOperations(jmxClient); @@ -202,4 +158,26 @@ public class CassandraAdapter implements ICassandraAdapter { return "CassandraAdapter" + "@" + Integer.toHexString(hashCode()); } + + @NotNull + protected Host getHost(Metadata metadata) + { + if (host != null) + { + return host; + } + + synchronized (this) + { + if (host == null) + { + host = driverUtils.getHost(metadata, localNativeTransportAddress); + if (host == null) + { + throw new CassandraUnavailableException(CQL, "No Host available in Metadata for address: " + localNativeTransportAddress); + } + } + } + return host; + } } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java index 8982c1b9..5b641ee8 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java @@ -157,6 +157,6 @@ public class GossipDependentStorageJmxOperations implements StorageJmxOperations return; LOGGER.warn("Gossip is disabled and unavailable for the operation"); - throw OperationUnavailableException.GOSSIP_DISABLED; + throw new OperationUnavailableException("Gossip is required for the operation but it is disabled"); } } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/exception/OperationUnavailableException.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/exception/OperationUnavailableException.java index 818b8783..d1897720 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/exception/OperationUnavailableException.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/exception/OperationUnavailableException.java @@ -23,9 +23,6 @@ package org.apache.cassandra.sidecar.adapters.base.exception; */ public class OperationUnavailableException extends RuntimeException { - public static final OperationUnavailableException GOSSIP_DISABLED - = new OperationUnavailableException("Gossip is required for the operation but it is disabled"); - public OperationUnavailableException(String errorMessage) { super(errorMessage); diff --git a/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Adapter.java b/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Adapter.java index 0a16466e..7c6a4c2a 100644 --- a/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Adapter.java +++ b/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Adapter.java @@ -27,6 +27,7 @@ import org.apache.cassandra.sidecar.common.server.JmxClient; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.common.server.utils.DriverUtils; +import org.jetbrains.annotations.NotNull; /** * A {@link ICassandraAdapter} implementation for Cassandra 4.1 and later @@ -46,6 +47,7 @@ public class Cassandra41Adapter extends CassandraAdapter * {@inheritDoc} */ @Override + @NotNull public StorageOperations storageOperations() { return new Cassandra41StorageOperations(jmxClient, dnsResolver); diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CQLSessionProvider.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CQLSessionProvider.java index bcbcce58..04a8bd15 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CQLSessionProvider.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/CQLSessionProvider.java @@ -19,6 +19,8 @@ package org.apache.cassandra.sidecar.common.server; import com.datastax.driver.core.Session; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -32,16 +34,18 @@ public interface CQLSessionProvider { /** * Provides a Session connected to the cluster; otherwise, it tries to connect to the cluster. - * Returning null means the connection can not be established. - * The session still might throw a NoHostAvailableException if the cluster is unreachable. + * {@link CassandraUnavailableException} is thrown when no CQL connection can be established. * - * @return Session or null + * @return the session that holds connections to a Cassandra cluster + * @throws CassandraUnavailableException when CQL connection is not successful */ - @Nullable Session get(); + @NotNull Session get() throws CassandraUnavailableException; /** * Gets the current Session object if it already exists. - * Unlike {@link #get()}, it does not attempt to connect to the cluster. + * Unlike {@link #get()}, it does not attempt to connect to the cluster, + * and it can return {@code null} when no connection is established. + * The call-sites are required to handle {@code null} value. * * @return the connected {@link Session} object if available. Null otherwise. */ diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java index 4db86f49..e16acc24 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/ICassandraAdapter.java @@ -25,11 +25,13 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.Statement; import org.apache.cassandra.sidecar.common.response.NodeSettings; -import org.jetbrains.annotations.Nullable; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; +import org.jetbrains.annotations.NotNull; /** - * Core Cassandra Adapter interface - * For now, this is just a placeholder. We will most likely want to define the interface to returns bits such as + * Core Cassandra Adapter interface. + * + * <p>For now, this is just a placeholder. We will most likely want to define the interface to returns bits such as * compaction(), clusterMembership(), etc., which return interfaces such as Compaction, ClusterMembership. * We will need different implementations due to the slow move away from JMX towards CQL for some, but not all, actions. */ @@ -37,64 +39,77 @@ public interface ICassandraAdapter { /** * @return metadata on the connected cluster, including known nodes and schema definitions + * @throws CassandraUnavailableException when no CQL connection is established */ - Metadata metadata(); + @NotNull Metadata metadata() throws CassandraUnavailableException; /** * The {@link NodeSettings} for this instance. + * * @return the {@link NodeSettings} instance for this instance. + * @throws CassandraUnavailableException when no JMX connection is established */ - NodeSettings nodeSettings(); + @NotNull NodeSettings nodeSettings() throws CassandraUnavailableException; /** * Execute the provided query on the locally-managed Cassandra instance + * * @param query the query to execute * @return the {@link ResultSet} + * @throws CassandraUnavailableException when CQL connection is not yet established */ - default ResultSet executeLocal(String query) + @NotNull + default ResultSet executeLocal(String query) throws CassandraUnavailableException { return executeLocal(new SimpleStatement(query)); } /** * Execute the provided statement on the locally-managed Cassandra instance + * * @param statement the statement to execute * @return the {@link ResultSet} + * @throws CassandraUnavailableException when CQL connection is not yet established */ - ResultSet executeLocal(Statement statement); + @NotNull ResultSet executeLocal(Statement statement) throws CassandraUnavailableException; /** * The address on which the local Cassandra instance is listening for CQL connections + * * @return the {@link InetSocketAddress} representing the address and port. + * @throws CassandraUnavailableException when CQL connection is not yet established */ - InetSocketAddress localNativeTransportAddress(); + @NotNull InetSocketAddress localNativeTransportAddress() throws CassandraUnavailableException; /** * The address on which the local Cassandra instance broadcasts the intra-cluster storage traffic + * * @return the {@link InetSocketAddress} representing the address and port. - * When CQL connection is not yet established, returns null + * @throws CassandraUnavailableException when CQL connection is not yet established */ - @Nullable - InetSocketAddress localStorageBroadcastAddress(); + @NotNull InetSocketAddress localStorageBroadcastAddress() throws CassandraUnavailableException; /** * @return the {@link StorageOperations} implementation for the Cassandra cluster + * @throws CassandraUnavailableException when Cassandra is not available */ - StorageOperations storageOperations(); + @NotNull StorageOperations storageOperations() throws CassandraUnavailableException; /** * @return the {@link MetricsOperations} implementation for the Cassandra cluster + * @throws CassandraUnavailableException when Cassandra is not available */ - MetricsOperations metricsOperations(); - + @NotNull MetricsOperations metricsOperations() throws CassandraUnavailableException; /** * @return the {@link ClusterMembershipOperations} implementation for handling cluster membership operations + * @throws CassandraUnavailableException when Cassandra is not available */ - ClusterMembershipOperations clusterMembershipOperations(); + @NotNull ClusterMembershipOperations clusterMembershipOperations() throws CassandraUnavailableException; /** * @return the {@link TableOperations} implementation for the Cassandra cluster + * @throws CassandraUnavailableException when Cassandra is not available */ - TableOperations tableOperations(); + @NotNull TableOperations tableOperations(); } diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java b/server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java index 37c1a1f3..75350d11 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/db/DatabaseAccessor.java @@ -50,21 +50,7 @@ public abstract class DatabaseAccessor<T extends TableSchema> @NotNull public Session session() { - Session session; - try - { - session = cqlSessionProvider.get(); - } - catch (Exception e) - { - throw new IllegalStateException("Instance is not ready", e); - } - if (session == null) - { - logger.error("Unable to obtain session"); - throw new IllegalStateException("Could not obtain session"); - } - return session; + return cqlSessionProvider.get(); } protected ResultSet execute(Statement statement) diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/CassandraUnavailableException.java b/server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/CassandraUnavailableException.java new file mode 100644 index 00000000..8e963951 --- /dev/null +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/CassandraUnavailableException.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.exceptions; + +/** + * Exception thrown when Cassandra instance service is unavailable to Sidecar + */ +public class CassandraUnavailableException extends RuntimeException +{ + public CassandraUnavailableException(Service service, String additionalMessage) + { + super(service.toExceptionMessage() + ". " + additionalMessage); + } + + public CassandraUnavailableException(Service service, Throwable cause) + { + super(service.toExceptionMessage(), cause); + } + + /** + * Cassandra service types + */ + public enum Service + { + /** + * JMX service + */ + JMX, + /** + * CQL/native service + */ + CQL, + /** + * Both CQL and JMX + */ + CQL_AND_JMX; + + public String toExceptionMessage() + { + return "Cassandra " + this.name() + " service is unavailable"; + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java index 3ec6b849..608b8e76 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java @@ -58,10 +58,14 @@ import org.apache.cassandra.sidecar.config.DriverConfiguration; import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.config.SslConfiguration; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.exceptions.ConfigurationException; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.VisibleForTesting; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL; + /** * Provides connections to the local Cassandra cluster as defined in the Configuration. Currently, it only supports * returning the local connection. @@ -79,7 +83,6 @@ public class CQLSessionProviderImpl implements CQLSessionProvider private final String username; private final String password; private final DriverUtils driverUtils; - @Nullable private volatile Session session; @VisibleForTesting @@ -169,13 +172,14 @@ public class CQLSessionProviderImpl implements CQLSessionProvider * @return Session */ @Override - @Nullable - public synchronized Session get() + @NotNull + public synchronized Session get() throws CassandraUnavailableException { if (session != null) { return session; } + Cluster cluster = null; try { @@ -219,26 +223,29 @@ public class CQLSessionProviderImpl implements CQLSessionProvider cluster = builder.build(); session = cluster.connect(); logger.info("Successfully connected to Cassandra!"); + return session; } - catch (Exception e) + catch (Exception connectionException) { - logger.error("Failed to reach Cassandra", e); + logger.error("Failed to reach Cassandra", connectionException); if (cluster != null) { try { cluster.close(); } - catch (Exception ex) + catch (Exception closeException) { - logger.error("Failed to close cluster in cleanup", ex); + logger.error("Failed to close cluster in cleanup", closeException); + connectionException.addSuppressed(closeException); } } + throw new CassandraUnavailableException(CQL, connectionException); } - return session; } @Override + @Nullable public Session getIfConnected() { return session; diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java index c1de10cf..479a497b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java @@ -54,20 +54,21 @@ import org.apache.cassandra.sidecar.common.server.MetricsOperations; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.TableOperations; import org.apache.cassandra.sidecar.common.server.utils.DriverUtils; +import org.apache.cassandra.sidecar.common.utils.Preconditions; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics; import org.apache.cassandra.sidecar.utils.CassandraVersionProvider; import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL_AND_JMX; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_DISCONNECTED; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_DISCONNECTED; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_JMX_READY; - /** * Since it's possible for the version of Cassandra to change under us, we need this delegate to wrap the functionality * of the underlying Cassandra adapter. If a server reboots, we can swap out the right Adapter when the driver @@ -91,8 +92,8 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi private final JmxClient jmxClient; private final JmxNotificationListener notificationListener; private SimpleCassandraVersion currentVersion; - private ICassandraAdapter adapter; - private volatile boolean isNativeUp = false; + private volatile ICassandraAdapter adapter; + private final AtomicBoolean isNativeUp = new AtomicBoolean(false); private volatile NodeSettings nodeSettingsFromJmx = null; private final AtomicBoolean registered = new AtomicBoolean(false); private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false); @@ -137,6 +138,7 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi notificationListener = initializeJmxListener(); } + // TODO: re-organize the methods in the class to group the public/protected/private methods together protected JmxNotificationListener initializeJmxListener() { JmxNotificationListener notificationListener = new JmxNotificationListener(); @@ -205,7 +207,7 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi * Performs health checks by utilizing the JMX protocol. It uses a small subset of the exposed mBeans to * collect information needed to populate the {@link NodeSettings} object. */ - protected void jmxHealthCheck() + protected synchronized void jmxHealthCheck() { try { @@ -238,8 +240,12 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi */ protected void nativeProtocolHealthCheck() { - Session activeSession = cqlSessionProvider.get(); - if (activeSession == null) + Session activeSession; + try + { + activeSession = cqlSessionProvider.get(); + } + catch (CassandraUnavailableException cue) { LOGGER.info("No local CQL session is available for cassandraInstanceId={}. " + "Cassandra instance is down presumably.", cassandraInstanceId); @@ -264,24 +270,12 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi healthCheckStatement.setHost(host); healthCheckStatement.setConsistencyLevel(ConsistencyLevel.ONE); Row row = activeSession.execute(healthCheckStatement).one(); + // This should never happen but added for completeness + Preconditions.checkArgument(row != null, "Session execution result should never be null"); - if (row != null) - { - if (!isNativeUp) - { - isNativeUp = true; - notifyNativeConnection(); - } - } - else + if (isNativeUp.compareAndSet(false, true)) { - // This should never happen but added for completeness - LOGGER.error("Expected to query the release_version from system.local but encountered null {}", - cassandraInstanceId); - // The cassandra native protocol connection to the node is down. - markNativeDownAndMaybeNotifyDisconnection(); - // Unregister the host listener. - maybeUnregisterHostListener(activeSession); + notifyNativeConnection(); } } catch (IllegalArgumentException | NoHostAvailableException e) @@ -357,9 +351,9 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi * @return metadata on the connected cluster, including known nodes and schema definitions obtained from the * {@link ICassandraAdapter} */ - @Nullable @Override - public Metadata metadata() + @NotNull + public Metadata metadata() throws CassandraUnavailableException { return fromAdapter(ICassandraAdapter::metadata); } @@ -368,58 +362,61 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi * Returns the cached node settings value obtained during scheduled health checks. This method does not delegate * to the internal adapter, as the information is retrieved on the configured health check interval. * - * @return a cached {@link NodeSettings}. The returned value will be {@code null} when no JMX connection is - * established + * @return a cached {@link NodeSettings}. + * @throws CassandraUnavailableException when no JMX connection is established */ - @Nullable @Override - public NodeSettings nodeSettings() + @NotNull + public NodeSettings nodeSettings() throws CassandraUnavailableException { return nodeSettingsFromJmx; } @Override - public ResultSet executeLocal(Statement statement) + @NotNull + public ResultSet executeLocal(Statement statement) throws CassandraUnavailableException { return fromAdapter(adapter -> adapter.executeLocal(statement)); } @Override - public InetSocketAddress localNativeTransportAddress() + @NotNull + public InetSocketAddress localNativeTransportAddress() throws CassandraUnavailableException { return fromAdapter(ICassandraAdapter::localNativeTransportAddress); } @Override - public InetSocketAddress localStorageBroadcastAddress() + @NotNull + public InetSocketAddress localStorageBroadcastAddress() throws CassandraUnavailableException { return fromAdapter(ICassandraAdapter::localStorageBroadcastAddress); } - @Nullable @Override - public StorageOperations storageOperations() + @NotNull + public StorageOperations storageOperations() throws CassandraUnavailableException { return fromAdapter(ICassandraAdapter::storageOperations); } - @Nullable @Override - public MetricsOperations metricsOperations() + @NotNull + public MetricsOperations metricsOperations() throws CassandraUnavailableException { return fromAdapter(ICassandraAdapter::metricsOperations); } - @Nullable @Override - public ClusterMembershipOperations clusterMembershipOperations() + @NotNull + public ClusterMembershipOperations clusterMembershipOperations() throws CassandraUnavailableException { return fromAdapter(ICassandraAdapter::clusterMembershipOperations); } - @Nullable @Override - public TableOperations tableOperations() + @NotNull + public TableOperations tableOperations() throws CassandraUnavailableException { return fromAdapter(ICassandraAdapter::tableOperations); } @@ -467,7 +464,7 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi */ public boolean isNativeUp() { - return isNativeUp; + return isNativeUp.get(); } /** @@ -529,9 +526,7 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi protected void markNativeDownAndMaybeNotifyDisconnection() { healthMetrics.nativeDown.metric.setValue(1); - boolean wasCqlConnected = isNativeUp; - isNativeUp = false; - if (wasCqlConnected) + if (isNativeUp.compareAndSet(true, false)) { JsonObject disconnectMessage = new JsonObject() .put("cassandraInstanceId", cassandraInstanceId); @@ -556,11 +551,15 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi } } - @Nullable - private <T> T fromAdapter(Function<ICassandraAdapter, T> getter) + @NotNull + private <T> T fromAdapter(Function<ICassandraAdapter, T> getter) throws CassandraUnavailableException { ICassandraAdapter localAdapter = this.adapter; - return localAdapter == null ? null : getter.apply(localAdapter); + if (localAdapter == null) + { + throw new CassandraUnavailableException(CQL_AND_JMX, "CassandraAdapter is null"); + } + return getter.apply(localAdapter); } private void runIfThisHost(Host host, Runnable runnable) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java index 5d5ff5c0..3ce43697 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java @@ -19,12 +19,11 @@ package org.apache.cassandra.sidecar.cluster.instance; import java.util.List; -import java.util.function.Function; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; /** * Metadata of an instance @@ -62,26 +61,13 @@ public interface InstanceMetadata String cdcDir(); /** - * @return a {@link CassandraAdapterDelegate} specific for the instance + * @return a {@link CassandraAdapterDelegate} specific for the instance, or throws when the delegate is unavailable + * @throws CassandraUnavailableException when the Cassandra service is unavailable */ - @Nullable - CassandraAdapterDelegate delegate(); + @NotNull CassandraAdapterDelegate delegate() throws CassandraUnavailableException; /** * @return {@link InstanceMetrics} metrics specific for the Cassandra instance */ @NotNull InstanceMetrics metrics(); - - /** - * Get value from {@link CassandraAdapterDelegate} - * @param mapper the function is evaluated only when delegate is not null - * @return value retrieved from {@link CassandraAdapterDelegate} or null - * @param <T> value type - */ - @Nullable - default <T> T applyFromDelegate(Function<CassandraAdapterDelegate, T> mapper) - { - CassandraAdapterDelegate delegate = delegate(); - return delegate == null ? null : mapper.apply(delegate); - } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java index 8ab09b20..41d427e6 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java @@ -27,11 +27,15 @@ import java.util.stream.Collectors; import com.codahale.metrics.MetricRegistry; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.DataObjectBuilder; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics; import org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl; import org.apache.cassandra.sidecar.utils.FileUtils; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL_AND_JMX; + /** * Local implementation of InstanceMetadata. */ @@ -98,12 +102,18 @@ public class InstanceMetadataImpl implements InstanceMetadata } @Override - public @Nullable CassandraAdapterDelegate delegate() + @NotNull + public CassandraAdapterDelegate delegate() throws CassandraUnavailableException { + if (delegate == null) + { + throw new CassandraUnavailableException(CQL_AND_JMX, "CassandraAdapterDelegate is null"); + } return delegate; } @Override + @NotNull public InstanceMetrics metrics() { return metrics; @@ -114,6 +124,16 @@ public class InstanceMetadataImpl implements InstanceMetadata return new Builder(); } + @Override + public String toString() + { + return "InstanceMetadataImpl{" + + "id=" + id + + ", host='" + host + '\'' + + ", port=" + port + + '}'; + } + /** * {@code InstanceMetadataImpl} builder static inner class. */ diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cluster/locator/CachedLocalTokenRanges.java b/server/src/main/java/org/apache/cassandra/sidecar/cluster/locator/CachedLocalTokenRanges.java index 1a1155ed..fadc2eca 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cluster/locator/CachedLocalTokenRanges.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cluster/locator/CachedLocalTokenRanges.java @@ -41,11 +41,11 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.Host; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.Metadata; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.jetbrains.annotations.NotNull; /** @@ -88,9 +88,12 @@ public class CachedLocalTokenRanges implements LocalTokenRangesProvider return Collections.emptyMap(); } - CassandraAdapterDelegate delegate = localInstances.get(0).delegate(); - Metadata metadata = delegate == null ? null : delegate.metadata(); - if (metadata == null) + Metadata metadata; + try + { + metadata = localInstances.get(0).delegate().metadata(); + } + catch (CassandraUnavailableException ignored) { LOGGER.debug("Not yet connect to Cassandra cluster"); return Collections.emptyMap(); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java index 97a3c54c..58072661 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembership.java @@ -41,6 +41,7 @@ import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.utils.StringUtils; import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; /** * An implementation of {@link ElectorateMembership} where the current Sidecar will @@ -105,21 +106,16 @@ public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements Elec Set<String> result = new HashSet<>(); for (InstanceMetadata instance : instancesMetadata.instances()) { - CassandraAdapterDelegate delegate = instance.delegate(); - if (delegate == null) + try { - LOGGER.debug("Delegate is unavailable for instance={}", instance); - continue; + InetSocketAddress address = instance.delegate().localStorageBroadcastAddress(); + result.add(StringUtils.cassandraFormattedHostAndPort(address)); } - - InetSocketAddress address = delegate.localStorageBroadcastAddress(); - if (address == null) + catch (CassandraUnavailableException exception) { - LOGGER.warn("Unable to determine local storage broadcast address for instance={}", instance); - continue; + // Log a warning message and continue + LOGGER.warn("Unable to determine local storage broadcast address for instance. instance={}", instance, exception); } - - result.add(StringUtils.cassandraFormattedHostAndPort(address)); } return result; } @@ -128,11 +124,15 @@ public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements Elec { for (InstanceMetadata instance : instancesMetadata.instances()) { - CassandraAdapterDelegate delegate = instance.delegate(); - O applied = delegate == null ? null : mapper.apply(delegate); - if (applied != null) + try { - return applied; + CassandraAdapterDelegate delegate = instance.delegate(); + return mapper.apply(delegate); + } + catch (CassandraUnavailableException exception) + { + // no-op; try the next instance + LOGGER.debug("CassandraAdapterDelegate is not available for instance. instance={}", instance, exception); } } return null; @@ -154,8 +154,12 @@ public class MostReplicatedKeyspaceTokenZeroElectorateMembership implements Elec return null; } - Session activeSession = cqlSessionProvider.get(); - if (activeSession == null) + Session activeSession; + try + { + activeSession = cqlSessionProvider.get(); + } + catch (CassandraUnavailableException exception) { LOGGER.warn("There is no active session to Cassandra"); return null; diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java index 78f390c1..debb6e3b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java @@ -29,13 +29,14 @@ import java.util.UUID; import java.util.function.Function; import com.datastax.driver.core.Row; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider; import org.apache.cassandra.sidecar.common.DataObjectBuilder; import org.apache.cassandra.sidecar.common.response.data.RestoreRangeJson; import org.apache.cassandra.sidecar.common.server.data.RestoreRangeStatus; +import org.apache.cassandra.sidecar.common.server.utils.StringUtils; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; @@ -503,19 +504,10 @@ public class RestoreRange statusByReplica.put(storageAddressWithPort(instance), status); } - private String storageAddressWithPort(InstanceMetadata instance) + private String storageAddressWithPort(InstanceMetadata instance) throws CassandraUnavailableException { - try - { - InetSocketAddress storageAddress = instance.applyFromDelegate(CassandraAdapterDelegate::localStorageBroadcastAddress); - return storageAddress.getAddress().getHostAddress() + ':' + storageAddress.getPort(); - } - catch (NullPointerException npe) // various places can throw NPE. Catch them all in one single place. - { - NullPointerException e = new NullPointerException("Unexpected null storageBroadcastAddress from CassandraAdapter"); - e.addSuppressed(npe); - throw e; - } + InetSocketAddress storageAddress = instance.delegate().localStorageBroadcastAddress(); + return StringUtils.cassandraFormattedHostAndPort(storageAddress); } /** diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java index 1ce41337..a06019e4 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SidecarSchema.java @@ -33,6 +33,7 @@ import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.coordination.ClusterLease; import org.apache.cassandra.sidecar.coordination.ExecuteOnClusterLeaseholderOnly; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.exceptions.SidecarSchemaModificationException; import org.apache.cassandra.sidecar.metrics.SchemaMetrics; @@ -141,15 +142,9 @@ public class SidecarSchema return; } - Session session = cqlSessionProvider.get(); - if (session == null) - { - LOGGER.debug("Cql session is not yet available. Skip initializing..."); - return; - } - try { + Session session = cqlSessionProvider.get(); isInitialized = sidecarInternalKeyspace.initialize(session, this::shouldCreateSchema); if (isInitialized()) @@ -159,6 +154,10 @@ public class SidecarSchema reportSidecarSchemaInitialized(); } } + catch (CassandraUnavailableException ignored) + { + LOGGER.debug("Cql session is not yet available. Skip initializing..."); + } catch (Exception ex) { LOGGER.warn("Failed to initialize schema", ex); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/SidecarMetricsImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/SidecarMetricsImpl.java index c5c06a3d..501ce62c 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/SidecarMetricsImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/SidecarMetricsImpl.java @@ -19,7 +19,8 @@ package org.apache.cassandra.sidecar.metrics; import com.codahale.metrics.MetricRegistry; -import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; +import org.apache.cassandra.sidecar.exceptions.NoSuchSidecarInstanceException; import org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; @@ -50,22 +51,12 @@ public class SidecarMetricsImpl implements SidecarMetrics @Override public InstanceMetrics instance(int instanceId) { - InstanceMetadata instanceMetadata = instanceMetadataFetcher.instance(instanceId); - if (instanceMetadata == null) - { - throw new IllegalArgumentException("Instance metrics requested for non existent instance id " + instanceId); - } - return instanceMetadata.metrics(); + return instanceMetadataFetcher.instance(instanceId).metrics(); } @Override - public InstanceMetrics instance(String host) + public InstanceMetrics instance(String host) throws NoSuchSidecarInstanceException, CassandraUnavailableException { - InstanceMetadata instanceMetadata = instanceMetadataFetcher.instance(host); - if (instanceMetadata == null) - { - throw new IllegalArgumentException("Instance metrics requested for non existent host " + host); - } - return instanceMetadata.metrics(); + return instanceMetadataFetcher.instance(host).metrics(); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java index 53281b8f..8e7057e7 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.db.RestoreJob; @@ -159,21 +158,14 @@ public class RestoreJobProgressTracker { if (cleanupOutOfRangeRequested) { - CassandraAdapterDelegate delegate = instanceMetadata.delegate(); - StorageOperations operations = delegate == null ? null : delegate.storageOperations(); - if (operations == null) - { - LOGGER.warn("Out of range data cleanup for the restore job is requested. It failed to start the operation. jobId={}", restoreJob.jobId); - return; - } - try { + StorageOperations operations = instanceMetadata.delegate().storageOperations(); operations.outOfRangeDataCleanup(restoreJob.keyspaceName, restoreJob.tableName); } catch (Throwable cause) { - LOGGER.warn("Clean up out of range data has failed", cause); + LOGGER.warn("Clean up out of range data has failed. jobId={}", restoreJob.jobId, cause); } } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresher.java b/server/src/main/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresher.java index 91e34684..53ba5059 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresher.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresher.java @@ -40,6 +40,7 @@ import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.config.RestoreJobConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.db.RestoreJob; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.tasks.PeriodicTask; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.jetbrains.annotations.Nullable; @@ -121,10 +122,16 @@ public class RingTopologyRefresher implements PeriodicTask private void executeBlocking() { - CassandraAdapterDelegate delegate = metadataFetcher.anyInstance().delegate(); - StorageOperations storageOperations = delegate == null ? null : delegate.storageOperations(); - NodeSettings nodeSettings = delegate == null ? null : delegate.nodeSettings(); - if (storageOperations == null || nodeSettings == null) + CassandraAdapterDelegate delegate; + StorageOperations storageOperations; + NodeSettings nodeSettings; + try + { + delegate = metadataFetcher.anyInstance().delegate(); + storageOperations = delegate.storageOperations(); + nodeSettings = delegate.nodeSettings(); + } + catch (CassandraUnavailableException ignored) { LOGGER.debug("Not yet connect to Cassandra"); return; diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java index 0c3aa763..976256a0 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java @@ -19,7 +19,6 @@ package org.apache.cassandra.sidecar.routes; import java.util.NoSuchElementException; -import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,9 +31,7 @@ import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.HttpException; import org.apache.cassandra.sidecar.adapters.base.exception.OperationUnavailableException; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; -import org.apache.cassandra.sidecar.common.server.MetricsOperations; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.apache.cassandra.sidecar.common.server.exceptions.JmxAuthenticationException; @@ -43,7 +40,6 @@ import org.apache.cassandra.sidecar.exceptions.NoSuchSidecarInstanceException; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; @@ -102,25 +98,6 @@ public abstract class AbstractHandler<T> implements Handler<RoutingContext> } } - protected void ifMetricsOpsAvailable(RoutingContext context, - String host, - Consumer<MetricsOperations> ifAvailable) - { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - if (delegate == null) - { - context.fail(cassandraServiceUnavailable()); - return; - } - MetricsOperations operations = delegate.metricsOperations(); - if (operations == null) - { - context.fail(cassandraServiceUnavailable()); - return; - } - ifAvailable.accept(operations); - } - /** * Extracts the request object from the {@code context}. * diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java index dc59fba7..d58ca8f7 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/ConnectedClientStatsHandler.java @@ -22,6 +22,7 @@ import com.google.inject.Inject; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.server.MetricsOperations; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; @@ -30,7 +31,7 @@ import static org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryP /** * Handler for retrieving stats for connected clients */ -public class ConnectedClientStatsHandler extends AbstractHandler<Void> +public class ConnectedClientStatsHandler extends AbstractHandler<Boolean> { /** * Constructs a handler with the provided {@code metadataFetcher} @@ -52,21 +53,17 @@ public class ConnectedClientStatsHandler extends AbstractHandler<Void> HttpServerRequest httpRequest, String host, SocketAddress remoteAddress, - Void request) + Boolean summaryOnly) { - - ifMetricsOpsAvailable(context, host, operations -> { - boolean summaryOnly = parseBooleanQueryParam(httpRequest, "summary", true); - - executorPools.service() - .executeBlocking(() -> operations.connectedClientStats(summaryOnly)) - .onSuccess(context::json) - .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request)); - }); + MetricsOperations operations = metadataFetcher.delegate(host).metricsOperations(); + executorPools.service() + .executeBlocking(() -> operations.connectedClientStats(summaryOnly)) + .onSuccess(context::json) + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, summaryOnly)); } - protected Void extractParamsOrThrow(RoutingContext context) + protected Boolean extractParamsOrThrow(RoutingContext context) { - return null; + return parseBooleanQueryParam(context.request(), "summary", true); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java index 785badc1..1e86cad7 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java @@ -26,14 +26,12 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** @@ -60,23 +58,9 @@ public class RingHandler extends AbstractHandler<Name> SocketAddress remoteAddress, Name keyspace) { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - if (delegate == null) - { - context.fail(cassandraServiceUnavailable()); - return; - } - - StorageOperations storageOperations = delegate.storageOperations(); - - if (storageOperations == null) - { - context.fail(cassandraServiceUnavailable()); - return; - } - + StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); executorPools.service() - .executeBlocking(() -> storageOperations.ring(keyspace)) + .executeBlocking(() -> operations.ring(keyspace)) .onSuccess(context::json) .onFailure(cause -> processFailure(cause, context, host, remoteAddress, keyspace)); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/SchemaHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/SchemaHandler.java index 95410f3f..fe1d96b7 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/SchemaHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/SchemaHandler.java @@ -26,7 +26,6 @@ import io.vertx.core.Future; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.response.SchemaResponse; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; @@ -34,7 +33,6 @@ import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.sidecar.utils.MetadataUtils; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** @@ -75,20 +73,12 @@ public class SchemaHandler extends AbstractHandler<Name> /** * Handles the request with the Cassandra {@link Metadata metadata}. * - * @param context the event to handle - * @param keyspace the keyspace parsed from the request - * @param metadata the metadata on the connected cluster, including known nodes and schema definitions + * @param context the event to handle + * @param keyspace the keyspace parsed from the request + * @param metadata the metadata on the connected cluster, including known nodes and schema definitions */ private void handleWithMetadata(RoutingContext context, Name keyspace, Metadata metadata) { - if (metadata == null) - { - // set request as failed and return - logger.error("Failed to obtain metadata on the connected cluster for request '{}'", keyspace); - context.fail(cassandraServiceUnavailable()); - return; - } - if (keyspace == null) { SchemaResponse schemaResponse = new SchemaResponse(metadata.exportSchemaAsString()); @@ -122,9 +112,8 @@ public class SchemaHandler extends AbstractHandler<Name> private Future<Metadata> metadata(String host) { return executorPools.service().executeBlocking(() -> { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); // metadata can block so we need to run in a blocking thread - return delegate.metadata(); + return metadataFetcher.delegate(host).metadata(); }); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java index 6e1cec49..37bd9e13 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java @@ -30,7 +30,6 @@ import io.vertx.core.Future; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.TableOperations; import org.apache.cassandra.sidecar.common.server.data.Name; @@ -42,7 +41,6 @@ import org.apache.cassandra.sidecar.snapshots.SnapshotPathBuilder; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** @@ -81,21 +79,10 @@ public class StreamSSTableComponentHandler extends AbstractHandler<StreamSSTable private Future<String> resolveComponentPathFromRequest(String host, StreamSSTableComponentRequestParam request) { return executorPools.internal().executeBlocking(() -> { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - if (delegate == null) - { - throw cassandraServiceUnavailable(); - } - int dataDirIndex = request.dataDirectoryIndex(); if (request.tableId() != null) { - StorageOperations storageOperations = delegate.storageOperations(); - if (storageOperations == null) - { - throw cassandraServiceUnavailable(); - } - + StorageOperations storageOperations = metadataFetcher.delegate(host).storageOperations(); List<String> dataDirList = storageOperations.dataFileLocations(); if (dataDirIndex < 0 || dataDirIndex >= dataDirList.size()) { @@ -106,12 +93,7 @@ public class StreamSSTableComponentHandler extends AbstractHandler<StreamSSTable else { logger.debug("Streaming SSTable component without a table Id. request={}, instance={}", request, host); - TableOperations tableOperations = delegate.tableOperations(); - if (tableOperations == null) - { - throw cassandraServiceUnavailable(); - } - + TableOperations tableOperations = metadataFetcher.delegate(host).tableOperations(); // asking jmx to give us the path for keyspace/table - tableId // as opposed to storageOperations.dataFileLocations, the table directory can change // when someone drops a table and recreates it with the same name, the table id will change diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java index ccf15dda..d53c4268 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/TokenRangeReplicaMapHandler.java @@ -20,7 +20,6 @@ package org.apache.cassandra.sidecar.routes; import org.apache.commons.lang3.StringUtils; -import com.datastax.driver.core.Metadata; import com.google.inject.Inject; import com.google.inject.Singleton; import io.netty.handler.codec.http.HttpResponseStatus; @@ -28,6 +27,7 @@ import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.common.response.NodeSettings; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; @@ -35,7 +35,6 @@ import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.HttpExceptions; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** @@ -70,22 +69,10 @@ public class TokenRangeReplicaMapHandler extends AbstractHandler<Name> Name keyspace) { CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - if (delegate == null) - { - context.fail(cassandraServiceUnavailable()); - return; - } - + NodeSettings nodeSettings = delegate.nodeSettings(); StorageOperations operations = delegate.storageOperations(); - Metadata metadata = delegate.metadata(); - if (operations == null || metadata == null) - { - context.fail(cassandraServiceUnavailable()); - return; - } - executorPools.service() - .executeBlocking(() -> operations.tokenRangeReplicas(keyspace, metadata.getPartitioner())) + .executeBlocking(() -> operations.tokenRangeReplicas(keyspace, nodeSettings.partitioner())) .onSuccess(context::json) .onFailure(cause -> processFailure(cause, context, host, remoteAddress, keyspace)); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java index 75e7ad5f..af696345 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/cassandra/NodeSettingsHandler.java @@ -24,14 +24,10 @@ import com.google.inject.Singleton; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; -import org.apache.cassandra.sidecar.common.response.NodeSettings; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.routes.AbstractHandler; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; - /** * Provides REST endpoint to get the configured settings of a cassandra node */ @@ -59,21 +55,7 @@ public class NodeSettingsHandler extends AbstractHandler<Void> SocketAddress remoteAddress, Void request) { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - if (delegate == null) - { - context.fail(cassandraServiceUnavailable()); - return; - } - NodeSettings nodeSettings = delegate.nodeSettings(); - if (nodeSettings == null) - { - context.fail(cassandraServiceUnavailable()); - } - else - { - context.json(nodeSettings); - } + context.json(metadataFetcher.delegate(host).nodeSettings()); } /** diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/ClearSnapshotHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/ClearSnapshotHandler.java index 43849bcf..3fdf21f4 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/ClearSnapshotHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/ClearSnapshotHandler.java @@ -27,7 +27,6 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.routes.AbstractHandler; @@ -35,7 +34,6 @@ import org.apache.cassandra.sidecar.routes.data.SnapshotRequestParam; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** @@ -69,14 +67,8 @@ public class ClearSnapshotHandler extends AbstractHandler<SnapshotRequestParam> SocketAddress remoteAddress, SnapshotRequestParam requestParams) { + StorageOperations storageOperations = metadataFetcher.delegate(host).storageOperations(); executorPools.service().runBlocking(() -> { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host(context)); - StorageOperations storageOperations = delegate == null ? null : delegate.storageOperations(); - if (storageOperations == null) - { - throw cassandraServiceUnavailable(); - } - logger.debug("Clearing snapshot request={}, remoteAddress={}, instance={}", requestParams, remoteAddress, host); storageOperations.clearSnapshot(requestParams.snapshotName(), requestParams.keyspace(), diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/CreateSnapshotHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/CreateSnapshotHandler.java index 4c58a7b3..9adb8e04 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/CreateSnapshotHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/CreateSnapshotHandler.java @@ -30,18 +30,15 @@ import io.vertx.core.http.HttpServerRequest; import io.vertx.core.json.JsonObject; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.exceptions.NodeBootstrappingException; import org.apache.cassandra.sidecar.common.server.exceptions.SnapshotAlreadyExistsException; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; -import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.routes.AbstractHandler; import org.apache.cassandra.sidecar.routes.data.SnapshotRequestParam; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** @@ -76,28 +73,21 @@ public class CreateSnapshotHandler extends AbstractHandler<SnapshotRequestParam> SocketAddress remoteAddress, SnapshotRequestParam requestParams) { - TaskExecutorPool pool = executorPools.service(); - pool.runBlocking(() -> { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - StorageOperations storageOperations = delegate == null ? null : delegate.storageOperations(); - if (storageOperations == null) - { - throw cassandraServiceUnavailable(); - } + StorageOperations storageOperations = metadataFetcher.delegate(host).storageOperations(); + executorPools.service().runBlocking(() -> { + logger.debug("Creating snapshot request={}, remoteAddress={}, instance={}", + requestParams, remoteAddress, host); + Map<String, String> options = requestParams.ttl() != null + ? ImmutableMap.of("ttl", requestParams.ttl()) + : ImmutableMap.of(); - logger.debug("Creating snapshot request={}, remoteAddress={}, instance={}", - requestParams, remoteAddress, host); - Map<String, String> options = requestParams.ttl() != null - ? ImmutableMap.of("ttl", requestParams.ttl()) - : ImmutableMap.of(); - - storageOperations.takeSnapshot(requestParams.snapshotName(), requestParams.keyspace(), - requestParams.tableName(), options); - JsonObject jsonObject = new JsonObject() - .put("result", "Success"); - context.json(jsonObject); - }) - .onFailure(cause -> processFailure(cause, context, host, remoteAddress, requestParams)); + storageOperations.takeSnapshot(requestParams.snapshotName(), requestParams.keyspace(), + requestParams.tableName(), options); + JsonObject jsonObject = new JsonObject() + .put("result", "Success"); + context.json(jsonObject); + }) + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, requestParams)); } @Override diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/ListSnapshotHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/ListSnapshotHandler.java index 8fa8d33d..63a16582 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/ListSnapshotHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/snapshots/ListSnapshotHandler.java @@ -33,9 +33,7 @@ import io.vertx.core.Future; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse; -import org.apache.cassandra.sidecar.common.server.TableOperations; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.CacheConfiguration; import org.apache.cassandra.sidecar.config.ServiceConfiguration; @@ -48,7 +46,6 @@ import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.sidecar.utils.RequestUtils; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** @@ -193,21 +190,9 @@ public class ListSnapshotHandler extends AbstractHandler<SnapshotRequestParam> protected Future<List<String>> dataPaths(String host, String keyspace, String table) { - return executorPools.service().executeBlocking(() -> { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - if (delegate == null) - { - throw cassandraServiceUnavailable(); - } - - TableOperations tableOperations = delegate.tableOperations(); - if (tableOperations == null) - { - throw cassandraServiceUnavailable(); - } - - return tableOperations.getDataPaths(keyspace, table); - }); + return executorPools.service().executeBlocking(() -> metadataFetcher.delegate(host) + .tableOperations() + .getDataPaths(keyspace, table)); } protected Future<ListSnapshotFilesResponse> diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java index 93684c1f..847fc688 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java @@ -28,10 +28,9 @@ import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.HttpException; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.response.SSTableImportResponse; -import org.apache.cassandra.sidecar.common.server.TableOperations; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.routes.AbstractHandler; import org.apache.cassandra.sidecar.routes.data.SSTableImportRequestParam; import org.apache.cassandra.sidecar.utils.CacheFactory; @@ -40,7 +39,6 @@ import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.sidecar.utils.SSTableImporter; import org.apache.cassandra.sidecar.utils.SSTableUploadsPathBuilder; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** @@ -164,23 +162,18 @@ public class SSTableImportHandler extends AbstractHandler<SSTableImportRequestPa */ private Future<Void> importSSTablesAsync(SSTableImporter.ImportOptions importOptions) { - CassandraAdapterDelegate cassandra = metadataFetcher.delegate(importOptions.host()); - if (cassandra == null) - { - return Future.failedFuture(cassandraServiceUnavailable()); - } - - TableOperations tableOperations = cassandra.tableOperations(); - - if (tableOperations == null) - { - return Future.failedFuture(cassandraServiceUnavailable()); - } - else + try { + // ensure that table operations are available from the delegate before doing the import + // otherwise fail fast propagating the HttpException + metadataFetcher.delegate(importOptions.host()).tableOperations(); return uploadPathBuilder.isValidDirectory(importOptions.directory()) .compose(validDirectory -> importer.scheduleImport(importOptions)); } + catch (CassandraUnavailableException exception) + { + return Future.failedFuture(exception); + } } private static SSTableImporter.ImportOptions importOptions(String host, SSTableImportRequestParam request, diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java index 2625abe4..67b6f653 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java @@ -21,7 +21,6 @@ package org.apache.cassandra.sidecar.routes.sstableuploads; import java.util.concurrent.TimeUnit; import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.Metadata; import com.google.inject.Inject; import com.google.inject.Singleton; import io.netty.handler.codec.http.HttpResponseStatus; @@ -31,7 +30,6 @@ import io.vertx.core.file.FileSystem; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.response.SSTableUploadResponse; import org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; @@ -51,7 +49,6 @@ import org.apache.cassandra.sidecar.utils.MetadataUtils; import org.apache.cassandra.sidecar.utils.SSTableUploader; import org.apache.cassandra.sidecar.utils.SSTableUploadsPathBuilder; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; import static org.apache.cassandra.sidecar.utils.MetricUtils.parseSSTableComponent; @@ -71,13 +68,13 @@ public class SSTableUploadHandler extends AbstractHandler<SSTableUploadRequestPa /** * Constructs a handler with the provided params. * - * @param vertx the vertx instance - * @param serviceConfiguration configuration object holding config details of Sidecar - * @param metadataFetcher the interface to retrieve metadata - * @param uploader a class that uploads the components - * @param uploadPathBuilder a class that provides SSTableUploads directories - * @param executorPools executor pools for blocking executions - * @param validator a validator instance to validate Cassandra-specific input + * @param vertx the vertx instance + * @param serviceConfiguration configuration object holding config details of Sidecar + * @param metadataFetcher the interface to retrieve metadata + * @param uploader a class that uploads the components + * @param uploadPathBuilder a class that provides SSTableUploads directories + * @param executorPools executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input * @param digestVerifierFactory a factory of checksum verifiers */ @Inject @@ -191,16 +188,7 @@ public class SSTableUploadHandler extends AbstractHandler<SSTableUploadRequestPa SSTableUploadRequestParam request) { TaskExecutorPool pool = executorPools.service(); - return pool.executeBlocking(() -> { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - Metadata metadata = delegate == null ? null : delegate.metadata(); - if (metadata == null) - { - throw cassandraServiceUnavailable(); - } - - return metadata; - }) + return pool.executeBlocking(() -> metadataFetcher.delegate(host).metadata()) .compose(metadata -> { KeyspaceMetadata keyspaceMetadata = MetadataUtils.keyspace(metadata, request.keyspace()); if (keyspaceMetadata == null) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/validations/ValidateTableExistenceHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/validations/ValidateTableExistenceHandler.java index 27422fc0..4fef5ab7 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/validations/ValidateTableExistenceHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/validations/ValidateTableExistenceHandler.java @@ -19,7 +19,6 @@ package org.apache.cassandra.sidecar.routes.validations; import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.Metadata; import com.datastax.driver.core.TableMetadata; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -28,7 +27,6 @@ import io.vertx.core.Future; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.routes.AbstractHandler; @@ -36,7 +34,6 @@ import org.apache.cassandra.sidecar.routes.RoutingContextUtils; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; -import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** @@ -114,14 +111,6 @@ public class ValidateTableExistenceHandler extends AbstractHandler<QualifiedTabl private Future<KeyspaceMetadata> getKeyspaceMetadata(String host, String keyspace) { - return executorPools.service().executeBlocking(() -> { - CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); - Metadata metadata = delegate == null ? null : delegate.metadata(); - if (metadata == null) - { - throw cassandraServiceUnavailable(); - } - return metadata.getKeyspace(keyspace); - }); + return executorPools.service().executeBlocking(() -> metadataFetcher.delegate(host).metadata().getKeyspace(keyspace)); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java index 5a4b4d73..e8051006 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java @@ -250,7 +250,6 @@ public class MainModule extends AbstractModule public Router vertxRouter(Vertx vertx, SidecarConfiguration sidecarConfiguration, ChainAuthHandler chainAuthHandler, - ServiceConfiguration conf, CassandraHealthHandler cassandraHealthHandler, StreamSSTableComponentHandler streamSSTableComponentHandler, FileStreamHandler fileStreamHandler, @@ -287,7 +286,7 @@ public class MainModule extends AbstractModule router.route() .order(RoutingOrder.HIGHEST.order) .handler(loggerHandler) - .handler(TimeoutHandler.create(conf.requestTimeoutMillis(), + .handler(TimeoutHandler.create(sidecarConfiguration.serviceConfiguration().requestTimeoutMillis(), HttpResponseStatus.REQUEST_TIMEOUT.code())); // chain authentication before all requests diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/HttpExceptions.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/HttpExceptions.java index 45a65bf7..b27766a4 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/HttpExceptions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/HttpExceptions.java @@ -20,6 +20,7 @@ package org.apache.cassandra.sidecar.utils; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.ext.web.handler.HttpException; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; /** * This class consists exclusively of static methods that operate on or return {@link Exception}s of type @@ -32,18 +33,6 @@ public class HttpExceptions { } - /** - * Returns an {@link HttpException} with the {@link HttpResponseStatus#SERVICE_UNAVAILABLE} response code - * when the Cassandra service is unavailable. - * - * @return an {@link HttpException} with the {@link HttpResponseStatus#SERVICE_UNAVAILABLE} response code - * when the Cassandra service is unavailable - */ - public static HttpException cassandraServiceUnavailable() - { - return new HttpException(HttpResponseStatus.SERVICE_UNAVAILABLE.code(), "Cassandra service is unavailable"); - } - /** * Convenience method that returns a {@link HttpException} with the provided {@link HttpResponseStatus status} and * {@code cause} @@ -85,6 +74,13 @@ public class HttpExceptions { return (HttpException) cause; } + + if (cause instanceof CassandraUnavailableException) + { + String actualPayload = payload == null ? cause.getMessage() : payload; + return new HttpException(HttpResponseStatus.SERVICE_UNAVAILABLE.code(), actualPayload, cause); + } + if (payload != null) { return new HttpException(status.code(), payload, cause); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java index a63f03bb..c12b1710 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java @@ -26,6 +26,9 @@ import com.google.inject.Singleton; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; +import org.apache.cassandra.sidecar.exceptions.NoSuchSidecarInstanceException; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -50,7 +53,8 @@ public class InstanceMetadataFetcher * @return the {@link InstanceMetadata} for the given {@code host}, or the first instance when {@code host} is * {@code null} */ - public InstanceMetadata instance(@Nullable String host) + @NotNull + public InstanceMetadata instance(@Nullable String host) throws NoSuchSidecarInstanceException { return host == null ? firstInstance() @@ -64,8 +68,10 @@ public class InstanceMetadataFetcher * @param instanceId the identifier for the Cassandra instance * @return the {@link InstanceMetadata} for the given {@code instanceId}, or the first instance when * {@code instanceId} is {@code null} + * @throws NoSuchSidecarInstanceException when the Cassandra instance with {@code instanceId} does not exist */ - public InstanceMetadata instance(int instanceId) + @NotNull + public InstanceMetadata instance(int instanceId) throws NoSuchSidecarInstanceException { return instancesMetadata.instanceFromId(instanceId); } @@ -77,9 +83,11 @@ public class InstanceMetadataFetcher * @param host the Cassandra instance host * @return the {@link CassandraAdapterDelegate} for the given {@code host}, or the first instance when {@code host} * is {@code null} + * @throws NoSuchSidecarInstanceException when the Cassandra instance with {@code host} does not exist + * @throws CassandraUnavailableException when Cassandra is not yet connected */ - @Nullable - public CassandraAdapterDelegate delegate(String host) + @NotNull + public CassandraAdapterDelegate delegate(@Nullable String host) throws NoSuchSidecarInstanceException, CassandraUnavailableException { return instance(host).delegate(); } @@ -88,10 +96,12 @@ public class InstanceMetadataFetcher * Returns the {@link CassandraAdapterDelegate} for the given {@code instanceId} * * @param instanceId the identifier for the Cassandra instance - * @return the {@link CassandraAdapterDelegate} for the given {@code instanceId}, or the first instance when - * {@code instanceId} is {@code null} + * @return the {@link CassandraAdapterDelegate} for the given {@code instanceId} + * @throws NoSuchSidecarInstanceException when the Cassandra instance with {@code instanceId} does not exist + * @throws CassandraUnavailableException when Cassandra is not yet connected */ - public CassandraAdapterDelegate delegate(int instanceId) + @NotNull + public CassandraAdapterDelegate delegate(int instanceId) throws NoSuchSidecarInstanceException, CassandraUnavailableException { return instance(instanceId).delegate(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java index 5c902d38..f0b3fbd9 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java @@ -38,7 +38,6 @@ import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.ext.web.handler.HttpException; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.server.TableOperations; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; @@ -212,68 +211,51 @@ public class SSTableImporter ImportOptions options = pair.getValue(); InstanceMetadata instance = metadataFetcher.instance(options.host); - CassandraAdapterDelegate delegate = instance.delegate(); if (instanceMetrics == null) { instanceMetrics = instance.metrics(); } - - if (delegate == null) - { - failureCount++; - promise.fail(HttpExceptions.cassandraServiceUnavailable()); - continue; - } - - TableOperations tableOperations = delegate.tableOperations(); - if (tableOperations == null) - { - failureCount++; - promise.fail(HttpExceptions.cassandraServiceUnavailable()); - } - else + try { - try + TableOperations tableOperations = instance.delegate().tableOperations(); + long startTime = System.nanoTime(); + List<String> failedDirectories = + tableOperations.importNewSSTables(options.keyspace, + options.tableName, + options.directory, + options.resetLevel, + options.clearRepaired, + options.verifySSTables, + options.verifyTokens, + options.invalidateCaches, + options.extendedVerify, + options.copyData); + long serviceTimeNanos = System.nanoTime() - startTime; + if (!failedDirectories.isEmpty()) { - long startTime = System.nanoTime(); - List<String> failedDirectories = - tableOperations.importNewSSTables(options.keyspace, - options.tableName, - options.directory, - options.resetLevel, - options.clearRepaired, - options.verifySSTables, - options.verifyTokens, - options.invalidateCaches, - options.extendedVerify, - options.copyData); - long serviceTimeNanos = System.nanoTime() - startTime; - if (!failedDirectories.isEmpty()) - { - failureCount++; - LOGGER.error("Failed to import SSTables with options={}, serviceTimeMillis={}, " + - "failedDirectories={}", options, TimeUnit.NANOSECONDS.toMillis(serviceTimeNanos), - failedDirectories); - // TODO: HttpException should not be thrown by importer, as it is not at the transport layer - promise.fail(new HttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), - "Failed to import from directories: " + failedDirectories)); - } - else - { - successCount++; - LOGGER.info("Successfully imported SSTables with options={}, serviceTimeMillis={}", - options, TimeUnit.NANOSECONDS.toMillis(serviceTimeNanos)); - promise.complete(); - cleanup(options); - } + failureCount++; + LOGGER.error("Failed to import SSTables with options={}, serviceTimeMillis={}, " + + "failedDirectories={}", options, TimeUnit.NANOSECONDS.toMillis(serviceTimeNanos), + failedDirectories); + // TODO: HttpException should not be thrown by importer, as it is not at the transport layer + promise.fail(new HttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + "Failed to import from directories: " + failedDirectories)); } - catch (Exception exception) + else { - failureCount++; - LOGGER.error("Failed to import SSTables with options={}", options, exception); - promise.fail(exception); + successCount++; + LOGGER.info("Successfully imported SSTables with options={}, serviceTimeMillis={}", + options, TimeUnit.NANOSECONDS.toMillis(serviceTimeNanos)); + promise.complete(); + cleanup(options); } } + catch (Exception exception) + { + failureCount++; + LOGGER.error("Failed to import SSTables with options={}", options, exception); + promise.fail(exception); + } } if (successCount > 0 || failureCount > 0) diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java index d7a2425f..6f67424a 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/ClusterLeaseClaimTaskIntegrationTest.java @@ -58,6 +58,7 @@ import org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor; import org.apache.cassandra.sidecar.db.schema.SidecarLeaseSchema; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.metrics.CoordinationMetrics; import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; @@ -65,14 +66,17 @@ import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl; import org.apache.cassandra.sidecar.tasks.ExecutionDetermination; import org.apache.cassandra.sidecar.testing.SharedExecutorNettyOptions; import org.apache.cassandra.testing.TestVersion; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL; import static org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext.tryGetIntConfig; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -395,7 +399,8 @@ class ClusterLeaseClaimTaskIntegrationTest { DisconnectableCQLSessionProvider sessionProvider = instance.sessionProvider; sessionProvider.disconnect(); - assertThat(sessionProvider.get()).as("Simulating disable binary of instance %s", (i + 1)).isNull(); + assertThatExceptionOfType(CassandraUnavailableException.class).as("Simulating disable binary of instance %s", (i + 1)) + .isThrownBy(sessionProvider::get); return i; } } @@ -532,9 +537,15 @@ class ClusterLeaseClaimTaskIntegrationTest } @Override - public @Nullable Session get() + @NotNull + public Session get() throws CassandraUnavailableException { - return isConnected ? delegate.get() : null; + if (isConnected) + { + return delegate.get(); + } + + throw new CassandraUnavailableException(CQL, "Simulated CQL disconnection"); } @Override diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java index 4655abd9..b9746163 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java @@ -51,7 +51,6 @@ import org.apache.cassandra.sidecar.coordination.ClusterLease; import org.apache.cassandra.sidecar.exceptions.NoSuchSidecarInstanceException; import org.apache.cassandra.sidecar.tasks.ExecutionDetermination; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP; @@ -127,7 +126,8 @@ public class IntegrationTestModule extends AbstractModule CQLSessionProvider cqlSessionProvider = new CQLSessionProvider() { @Override - public @Nullable Session get() + @NotNull + public Session get() { return cassandraTestContext.session(); } @@ -139,7 +139,7 @@ public class IntegrationTestModule extends AbstractModule } @Override - public @Nullable Session getIfConnected() + public Session getIfConnected() { return get(); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/TestCassandraAdapterDelegate.java b/server/src/test/java/org/apache/cassandra/sidecar/TestCassandraAdapterDelegate.java index 77a8e7cd..d0bda397 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/TestCassandraAdapterDelegate.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/TestCassandraAdapterDelegate.java @@ -24,9 +24,11 @@ import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.response.NodeSettings; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.TableOperations; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics; -import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.NotNull; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL_AND_JMX; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; /** @@ -58,9 +60,10 @@ public class TestCassandraAdapterDelegate extends CassandraAdapterDelegate } @Override - public @Nullable Metadata metadata() + @NotNull + public Metadata metadata() { - return metadata; + return throwOnNull(metadata); } public void setMetadata(Metadata metadata) @@ -69,9 +72,10 @@ public class TestCassandraAdapterDelegate extends CassandraAdapterDelegate } @Override - public @Nullable TableOperations tableOperations() + @NotNull + public TableOperations tableOperations() { - return tableOperations; + return throwOnNull(tableOperations); } public void setTableOperations(TableOperations tableOperations) @@ -80,9 +84,10 @@ public class TestCassandraAdapterDelegate extends CassandraAdapterDelegate } @Override - public @Nullable NodeSettings nodeSettings() + @NotNull + public NodeSettings nodeSettings() { - return nodeSettings; + return throwOnNull(nodeSettings); } public void setNodeSettings(NodeSettings nodeSettings) @@ -102,9 +107,10 @@ public class TestCassandraAdapterDelegate extends CassandraAdapterDelegate } @Override - public @Nullable StorageOperations storageOperations() + @NotNull + public StorageOperations storageOperations() { - return storageOperations; + return throwOnNull(storageOperations); } public void setStorageOperations(StorageOperations storageOperations) @@ -116,4 +122,13 @@ public class TestCassandraAdapterDelegate extends CassandraAdapterDelegate public void close() { } + + private <T> T throwOnNull(T value) + { + if (value == null) + { + throw new CassandraUnavailableException(CQL_AND_JMX, "Cassandra unavailable"); + } + return value; + } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipTest.java b/server/src/test/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipTest.java index ab3f4778..4fbd452f 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipTest.java @@ -32,7 +32,9 @@ import org.apache.cassandra.sidecar.common.response.NodeSettings; import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -67,7 +69,7 @@ class MostReplicatedKeyspaceTokenZeroElectorateMembershipTest when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata)); CQLSessionProvider mockCQLSessionProvider = mock(CQLSessionProvider.class); // the session is not available so we return null - when(mockCQLSessionProvider.get()).thenReturn(null); + when(mockCQLSessionProvider.get()).thenThrow(new CassandraUnavailableException(CQL, new RuntimeException("connection failed"))); ElectorateMembership membership = new MostReplicatedKeyspaceTokenZeroElectorateMembership(mockInstancesMetadata, mockCQLSessionProvider, CONFIG); assertThat(membership.isMember()).as("When the CQL connection is unavailable, we can't determine participation") .isFalse(); diff --git a/server/src/test/java/org/apache/cassandra/sidecar/metrics/InstanceHealthMetricsTest.java b/server/src/test/java/org/apache/cassandra/sidecar/metrics/InstanceHealthMetricsTest.java index 55860700..157e60e6 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/metrics/InstanceHealthMetricsTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/metrics/InstanceHealthMetricsTest.java @@ -27,9 +27,11 @@ import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; import org.apache.cassandra.sidecar.common.server.JmxClient; import org.apache.cassandra.sidecar.common.server.utils.DriverUtils; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics; import org.apache.cassandra.sidecar.utils.CassandraVersionProvider; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.JMX; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -52,6 +54,7 @@ public class InstanceHealthMetricsTest Vertx vertx = Vertx.vertx(); CassandraVersionProvider mockCassandraVersionProvider = mock(CassandraVersionProvider.class); CQLSessionProvider mockCqlSessionProvider = mock(CQLSessionProvider.class); + when(mockCqlSessionProvider.get()).thenThrow(new CassandraUnavailableException(JMX, "not available")); jmxClient = mock(JmxClient.class); metrics = new InstanceHealthMetrics(registry(1)); delegate = new CassandraAdapterDelegate(vertx, 1, mockCassandraVersionProvider, diff --git a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java index c6a10979..fe140d19 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java @@ -82,6 +82,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -104,11 +105,11 @@ class RestoreRangeTaskTest @BeforeEach void setup() { - InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class, RETURNS_DEEP_STUBS); when(instanceMetadata.id()).thenReturn(1); when(instanceMetadata.host()).thenReturn("host-1"); when(instanceMetadata.metrics()).thenReturn(new InstanceMetricsImpl(registry(1))); - when(instanceMetadata.applyFromDelegate(any())).thenReturn(new InetSocketAddress(9043)); + when(instanceMetadata.delegate().localStorageBroadcastAddress()).thenReturn(new InetSocketAddress(9043)); InstanceMetadataFetcher mockInstanceMetadataFetcher = mock(InstanceMetadataFetcher.class); when(mockInstanceMetadataFetcher.instance(1)).thenReturn(instanceMetadata); RestoreSlice slice = RestoreSlice.builder() diff --git a/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java b/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java index 141320d3..404b13e3 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java @@ -42,10 +42,12 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.config.yaml.SSTableImportConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics; import org.apache.cassandra.sidecar.metrics.instance.InstanceMetricsImpl; import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL_AND_JMX; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyString; @@ -107,6 +109,7 @@ class SSTableImporterTest when(mockTableOperations2.importNewSSTables("ks", "tbl", "/dir", true, true, true, true, true, true, false)) .thenThrow(new RuntimeException("Exception during import")); + when(mockCassandraAdapterDelegate3.tableOperations()).thenThrow(new CassandraUnavailableException(CQL_AND_JMX, "Cassandra unavailable")); executorPools = new ExecutorPools(vertx, serviceConfiguration); mockUploadPathBuilder = mock(SSTableUploadsPathBuilder.class); @@ -180,10 +183,9 @@ class SSTableImporterTest }); importFuture.onComplete(context.failing(p -> { - assertThat(p).isInstanceOf(HttpException.class); - HttpException exception = (HttpException) p; - assertThat(exception.getStatusCode()).isEqualTo(HttpResponseStatus.SERVICE_UNAVAILABLE.code()); - assertThat(exception.getPayload()).isEqualTo("Cassandra service is unavailable"); + assertThat(p).isInstanceOf(CassandraUnavailableException.class); + CassandraUnavailableException exception = (CassandraUnavailableException) p; + assertThat(exception.getMessage()).isEqualTo("Cassandra CQL_AND_JMX service is unavailable. Cassandra unavailable"); assertThat(importer.importQueuePerHost).isNotEmpty(); assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("127.0.0.3", "ks", "tbl")); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org