This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 041c33bb CASSSIDECAR-203: Created Endpoint that Triggers an Immediate Schema Report (#198) 041c33bb is described below commit 041c33bbc4dc2821a138a3c9d9fc7e4c77ef430e Author: Yuriy Semchyshyn <5...@users.noreply.github.com> AuthorDate: Tue Mar 11 15:34:00 2025 -0500 CASSSIDECAR-203: Created Endpoint that Triggers an Immediate Schema Report (#198) Patch by Yuriy Semchyshyn; reviewed by Saranya Krishnakumar, Yifan Cai, Francisco Guerrero for CASSSIDECAR-203 --- CHANGES.txt | 1 + .../cassandra/sidecar/common/ApiEndpointsV1.java | 5 +- .../common/request/ReportSchemaRequest.java | 41 +++++ .../cassandra/sidecar/client/RequestContext.java | 25 ++- .../cassandra/sidecar/client/SidecarClient.java | 42 +++-- .../sidecar/client/SidecarClientTest.java | 35 +++- .../acl/authorization/BasicPermissions.java | 3 + .../cassandra/sidecar/datahub/SchemaReporter.java | 43 +++-- .../sidecar/routes/ReportSchemaHandler.java | 104 +++++++++++ .../cassandra/sidecar/server/MainModule.java | 17 +- .../datahub/SchemaReporterIntegrationTest.java | 71 ++++---- .../sidecar/datahub/SchemaReporterTest.java | 17 +- .../sidecar/routes/ReportSchemaHandlerTest.java | 191 +++++++++++++++++++++ 13 files changed, 508 insertions(+), 87 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c9228296..d8f4c061 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.1.0 ----- + * Create Endpoint that Triggers an Immediate Schema Report (CASSSIDECAR-203) * Add JWT Authentication support in Sidecar (CASSSIDECAR-160) * Add Token Ring Peer Health monitor (CASSSIDECAR-206) * Config APIs for storing CDC/Kafka configs for CDC feature (CASSSIDECAR-211) diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index faff4558..c38ee551 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -121,13 +121,16 @@ public final class ApiEndpointsV1 public static final String LIST_CDC_SEGMENTS_ROUTE = API_V1 + CDC_PATH + "/segments"; public static final String STREAM_CDC_SEGMENTS_ROUTE = LIST_CDC_SEGMENTS_ROUTE + "/" + SEGMENT_PATH_PARAM; + // Schema Reporting + private static final String REPORT_SCHEMA = "/datahub/schemas"; + public static final String REPORT_SCHEMA_ROUTE = API_V1 + REPORT_SCHEMA; + public static final String SERVICES_PATH = "/services"; public static final String SERVICE_PARAM = ":service"; public static final String CONFIG = "/config"; public static final String SERVICE_CONFIG_ROUTE = API_V1 + SERVICES_PATH + "/" + SERVICE_PARAM + CONFIG; public static final String SERVICES_CONFIG_ROUTE = API_V1 + SERVICES_PATH; - public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/connected-clients"; public static final String OPERATIONAL_JOBS = "/operational-jobs"; diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ReportSchemaRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ReportSchemaRequest.java new file mode 100644 index 00000000..91b5a7fc --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ReportSchemaRequest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.response.HealthResponse; + +/** + * A request to trigger an immediate, synchronous schema conversion + * and report regardless of the periodic task schedule or status + */ +public class ReportSchemaRequest extends JsonRequest<HealthResponse> +{ + public ReportSchemaRequest() + { + super(ApiEndpointsV1.REPORT_SCHEMA_ROUTE); + } + + @Override + public HttpMethod method() + { + return HttpMethod.PUT; + } +} diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java index 4fde93a9..f782077d 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java @@ -43,6 +43,7 @@ import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest; import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest; import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest; import org.apache.cassandra.sidecar.common.request.OperationalJobRequest; +import org.apache.cassandra.sidecar.common.request.ReportSchemaRequest; import org.apache.cassandra.sidecar.common.request.Request; import org.apache.cassandra.sidecar.common.request.RingRequest; import org.apache.cassandra.sidecar.common.request.SSTableComponentRequest; @@ -84,7 +85,7 @@ public class RequestContext protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST = new NodeDecommissionRequest(); protected static final StreamStatsRequest STREAM_STATS_REQUEST = new StreamStatsRequest(); - protected static final RetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); + protected static final RetryPolicy DEFAULT_NO_RETRY_POLICY = new NoRetryPolicy(); protected static final RetryPolicy DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY = new ExponentialBackoffRetryPolicy(10, 500L, 60_000L); @@ -140,7 +141,7 @@ public class RequestContext { private InstanceSelectionPolicy instanceSelectionPolicy; private Request request; - private RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY; + private RetryPolicy retryPolicy = DEFAULT_NO_RETRY_POLICY; private final Map<String, String> customHeaders; public Builder() @@ -498,6 +499,16 @@ public class RequestContext return request(new UploadSSTableRequest(keyspace, tableName, uploadId, component, digest, filename)); } + /** + * Sets the {@code request} to be a new instance of a {@link ReportSchemaRequest} + * + * @return this {@link Builder} for method chaining + */ + public Builder reportSchemaRequest() + { + return request(new ReportSchemaRequest()); + } + /** * Sets the {@code request} to be a {@link ConnectedClientStatsRequest} and returns a reference to this Builder * enabling method chaining. @@ -564,6 +575,16 @@ public class RequestContext return request(STREAM_STATS_REQUEST); } + /** + * Sets the {@code retryPolicy} to be an instance of {@link NoRetryPolicy} + * + * @return this {@link Builder} for method chaining + */ + public Builder noRetryPolicy() + { + return retryPolicy(DEFAULT_NO_RETRY_POLICY); + } + /** * Sets the {@code retryPolicy} to be an * {@link org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy} configured with diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index 6fe2f9c6..db1e2c85 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -540,9 +539,9 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt public CompletableFuture<ListCdcSegmentsResponse> listCdcSegments(SidecarInstance sidecarInstance) { return executor.executeRequestAsync(requestBuilder() - .singleInstanceSelectionPolicy(sidecarInstance) - .request(new ListCdcSegmentsRequest()) - .build()); + .singleInstanceSelectionPolicy(sidecarInstance) + .request(new ListCdcSegmentsRequest()) + .build()); } /** @@ -562,9 +561,26 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt StreamConsumer streamConsumer) { executor.streamRequest(requestBuilder() - .singleInstanceSelectionPolicy(sidecarInstance) - .request(new StreamCdcSegmentRequest(segment, range)) - .build(), streamConsumer); + .singleInstanceSelectionPolicy(sidecarInstance) + .request(new StreamCdcSegmentRequest(segment, range)) + .build(), streamConsumer); + } + + /** + * Sends a request to trigger an immediate, synchronous schema + * conversion and report on the specified instance of the Sidecar + * regardless of the periodic task schedule or status + * + * @param instance the {@link SidecarInstance} to receive the request + * @return a {@link CompletableFuture} for the request + */ + public <T> CompletableFuture<T> reportSchema(SidecarInstance instance) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .reportSchemaRequest() + .noRetryPolicy() // {@link NoRetryPolicy} is the preferred behavior here + .build()); } /** @@ -576,8 +592,8 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt public CompletableFuture<AllServicesConfigPayload> allServicesConfig() { return executor.executeRequestAsync(requestBuilder() - .request(new AllServicesConfigRequest()) - .build()); + .request(new AllServicesConfigRequest()) + .build()); } /** @@ -590,8 +606,8 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt public CompletableFuture<UpdateCdcServiceConfigPayload> updateCdcServiceConfig(Service service, Map<String, String> config) { return executor.executeRequestAsync(requestBuilder() - .request(new UpdateServiceConfigRequest(service, new UpdateCdcServiceConfigPayload(config))) - .build()); + .request(new UpdateServiceConfigRequest(service, new UpdateCdcServiceConfigPayload(config))) + .build()); } /** @@ -602,8 +618,8 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt public CompletableFuture<Void> deleteCdcServiceConfig(Service service) { return executor.executeRequestAsync(requestBuilder() - .request(new DeleteServiceConfigRequest(service)) - .build()); + .request(new DeleteServiceConfigRequest(service)) + .build()); } /** diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index 6ca140c9..fad89a38 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.stream.Collectors; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -114,6 +113,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatException; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; @@ -1607,7 +1607,7 @@ abstract class SidecarClientTest public void testListCdcSegments() throws ExecutionException, InterruptedException, JsonProcessingException { List<CdcSegmentInfo> segments = Arrays.asList(new CdcSegmentInfo("commit-log1", 100, 100, true, 1732148713725L), - new CdcSegmentInfo("commit-log2", 100, 10, false, 1732148713725L)); + new CdcSegmentInfo("commit-log2", 100, 10, false, 1732148713725L)); ListCdcSegmentsResponse listSegmentsResponse = new ListCdcSegmentsResponse("localhost", 9043, segments); ObjectMapper mapper = new ObjectMapper(); @@ -1673,6 +1673,35 @@ abstract class SidecarClientTest assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content"); } + @Test + public void testReportSchemaSuccess() + { + MockResponse response = new MockResponse().setResponseCode(OK.code()) + .setBody("{\"status\":\"OK\"}"); + + enqueue(response); + + SidecarInstance instance = instances.get(0); + + assertThatNoException().isThrownBy(() -> client.reportSchema(instance).get()); + } + + @Test + public void testReportSchemaFailure() + { + MockResponse response = new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()) + .setBody(INTERNAL_SERVER_ERROR.reasonPhrase()); + + enqueue(response); + + SidecarInstance instance = instances.get(0); + + assertThatThrownBy(() -> client.reportSchema(instance).get()).isExactlyInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(RetriesExhaustedException.class) + .hasMessageContaining(Integer.toString(INTERNAL_SERVER_ERROR.code())) + .hasMessageContaining(INTERNAL_SERVER_ERROR.reasonPhrase()); + } + @Test public void testAllServiceSuccessTests() throws IOException, ExecutionException, InterruptedException { @@ -1723,7 +1752,7 @@ abstract class SidecarClientTest enqueue(response); client.deleteCdcServiceConfig(Service.CDC).get(); validateResponseServed(ApiEndpointsV1.SERVICE_CONFIG_ROUTE.replaceAll(ApiEndpointsV1.SERVICE_PARAM, "cdc"), - request -> assertThat(request.getMethod()).isEqualTo("DELETE")); + request -> assertThat(request.getMethod()).isEqualTo("DELETE")); } @Test diff --git a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java index 240cec81..76fb723b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java @@ -63,6 +63,9 @@ public class BasicPermissions public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE); public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE); + // Permissions related to Schema Reporting + public static final Permission REPORT_SCHEMA = new DomainAwarePermission("SCHEMA:PUBLISH", CLUSTER_SCOPE); + // cassandra cluster related permissions public static final Permission READ_SCHEMA = new DomainAwarePermission("SCHEMA:READ", CLUSTER_SCOPE); public static final Permission READ_SCHEMA_KEYSPACE_SCOPED = new DomainAwarePermission("SCHEMA:READ", KEYSPACE_SCOPE); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java index ab2ddb54..4ad3e3f3 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java @@ -113,14 +113,23 @@ public class SchemaReporter /** * Public method for converting and reporting the Cassandra schema * - * @param cluster a {@link Cluster} to extract Cassandra schema from + * @param cluster the {@link Cluster} to extract Cassandra schema from */ public void process(@NotNull Cluster cluster) + { + process(cluster.getMetadata()); + } + + /** + * Public method for converting and reporting the Cassandra schema + * + * @param metadata the {@link Metadata} to extract Cassandra schema from + */ + public void process(@NotNull Metadata metadata) { try (Emitter emitter = emitterFactory.emitter()) { - stream(cluster.getMetadata()) - .forEach(ThrowableUtils.consumer(emitter::emit)); + stream(metadata).forEach(ThrowableUtils.consumer(emitter::emit)); } catch (Exception exception) { @@ -140,11 +149,12 @@ public class SchemaReporter protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> stream(@NotNull Metadata metadata) { return Streams.concat( - clusterConverters.stream() - .map(ThrowableUtils.function(converter -> converter.convert(metadata))), - metadata.getKeyspaces().stream() - .filter(this::neitherVirtualNorSystem) - .flatMap(this::stream)); + clusterConverters.stream() + .map(ThrowableUtils.function(converter -> converter.convert(metadata))), + metadata.getKeyspaces() + .stream() + .filter(this::neitherVirtualNorSystem) + .flatMap(this::stream)); } /** @@ -159,10 +169,11 @@ public class SchemaReporter protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> stream(@NotNull KeyspaceMetadata keyspace) { return Streams.concat( - keyspaceConverters.stream() - .map(ThrowableUtils.function(converter -> converter.convert(keyspace))), - keyspace.getTables().stream() - .flatMap(this::stream)); + keyspaceConverters.stream() + .map(ThrowableUtils.function(converter -> converter.convert(keyspace))), + keyspace.getTables() + .stream() + .flatMap(this::stream)); } /** @@ -176,7 +187,7 @@ public class SchemaReporter protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> stream(@NotNull TableMetadata table) { return tableConverters.stream() - .map(ThrowableUtils.function(converter -> converter.convert(table))); + .map(ThrowableUtils.function(converter -> converter.convert(table))); } /** @@ -195,8 +206,8 @@ public class SchemaReporter } String name = keyspace.getName(); - return !name.equals("system") - && !name.startsWith("system_") - && !name.equals("sidecar_internal"); + return !name.equals("system") && + !name.startsWith("system_") && + !name.equals("sidecar_internal"); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java new file mode 100644 index 00000000..5fe5d31a --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.Set; + +import com.datastax.driver.core.Metadata; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.datahub.SchemaReporter; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * An implementation of {@link AbstractHandler} used to trigger an immediate, + * synchronous conversion and report of the current schema + */ +@Singleton +public class ReportSchemaHandler extends AbstractHandler<Void> implements AccessProtected +{ + @NotNull + private final SchemaReporter schemaReporter; + + /** + * Constructs a new instance of {@link ReportSchemaHandler} using the provided instances + * of {@link InstanceMetadataFetcher}, {@link ExecutorPools}, and {@link SchemaReporter} + * + * @param metadata the metadata fetcher + * @param executor executor pools for blocking executions + * @param reporter schema reporter to use for the conversion + */ + @Inject + public ReportSchemaHandler(@NotNull InstanceMetadataFetcher metadata, + @NotNull ExecutorPools executor, + @NotNull SchemaReporter reporter) + { + super(metadata, executor, null); + + schemaReporter = reporter; + } + + /** + * {@inheritDoc} + */ + @Override + @NotNull + public Set<Authorization> requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.REPORT_SCHEMA.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + @Nullable + protected Void extractParamsOrThrow(@NotNull RoutingContext context) + { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + protected void handleInternal(@NotNull RoutingContext context, + @NotNull HttpServerRequest http, + @NotNull String host, + @NotNull SocketAddress address, + @Nullable Void request) + { + Metadata metadata = metadataFetcher.callOnFirstAvailableInstance(instance -> instance.delegate().metadata()); + + executorPools.service() + .runBlocking(() -> schemaReporter.process(metadata)) + .onSuccess(ignored -> context.json(MainModule.OK_STATUS)) + .onFailure(throwable -> processFailure(throwable, context, host, address, request)); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java index c772cdcd..71d2294e 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java @@ -141,6 +141,7 @@ import org.apache.cassandra.sidecar.routes.KeyspaceSchemaHandler; import org.apache.cassandra.sidecar.routes.ListOperationalJobsHandler; import org.apache.cassandra.sidecar.routes.NodeDecommissionHandler; import org.apache.cassandra.sidecar.routes.OperationalJobHandler; +import org.apache.cassandra.sidecar.routes.ReportSchemaHandler; import org.apache.cassandra.sidecar.routes.RingHandler; import org.apache.cassandra.sidecar.routes.RoutingOrder; import org.apache.cassandra.sidecar.routes.SchemaHandler; @@ -181,7 +182,6 @@ import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.API_V1_ALL_ROUTES; import static org.apache.cassandra.sidecar.common.server.utils.ByteUtils.bytesToHumanReadableBinaryPrefix; -import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP; @@ -364,6 +364,7 @@ public class MainModule extends AbstractModule SSTableCleanupHandler ssTableCleanupHandler, StreamCdcSegmentHandler streamCdcSegmentHandler, ListCdcDirHandler listCdcDirHandler, + ReportSchemaHandler reportSchemaHandler, RestoreRequestValidationHandler validateRestoreJobRequest, DiskSpaceProtectionHandler diskSpaceProtection, ValidateTableExistenceHandler validateTableExistence, @@ -631,6 +632,14 @@ public class MainModule extends AbstractModule .handler(streamCdcSegmentHandler) .build(); + // Schema Reporting + protectedRouteBuilderFactory.get() + .router(router) + .method(HttpMethod.PUT) + .endpoint(ApiEndpointsV1.REPORT_SCHEMA_ROUTE) + .handler(reportSchemaHandler) + .build(); + protectedRouteBuilderFactory.get().router(router).method(HttpMethod.PUT) .endpoint(ApiEndpointsV1.SERVICE_CONFIG_ROUTE) .setBodyHandler(true) @@ -959,9 +968,8 @@ public class MainModule extends AbstractModule ignored -> { periodicTaskExecutor.schedule(clusterLeaseClaimTask); periodicTaskExecutor.schedule(sidecarPeerHealthMonitorTask); + periodicTaskExecutor.schedule(schemaReportingTask); }); - vertx.eventBus().localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), - message -> periodicTaskExecutor.schedule(schemaReportingTask)); return periodicTaskExecutor; } @@ -975,7 +983,8 @@ public class MainModule extends AbstractModule @NotNull protected String initialize() { - return fetcher.callOnFirstAvailableInstance(i -> i.delegate().storageOperations().clusterName()); + return fetcher.callOnFirstAvailableInstance(instance -> + instance.delegate().storageOperations().clusterName()); } }; diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java index 08c31097..9650e87b 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java @@ -36,7 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * Integration test for {@link SchemaReporter} */ -@SuppressWarnings({"try", "unused"}) +@SuppressWarnings("resource") final class SchemaReporterIntegrationTest extends IntegrationTestBase { private static final IdentifiersProvider IDENTIFIERS = new TestIdentifiers(); @@ -50,9 +50,9 @@ final class SchemaReporterIntegrationTest extends IntegrationTestBase private static String normalizeNames(@NotNull String schema) { return schema.replaceAll("(?is)(?<=\\b(" + DATA_CENTER_PREFIX + "|" - + TEST_CLUSTER_PREFIX + "|" - + TEST_KEYSPACE + "|" - + TEST_TABLE_PREFIX + "))\\d+\\b", ""); + + TEST_CLUSTER_PREFIX + "|" + + TEST_KEYSPACE + "|" + + TEST_TABLE_PREFIX + "))\\d+\\b", ""); } @CassandraIntegrationTest @@ -62,56 +62,53 @@ final class SchemaReporterIntegrationTest extends IntegrationTestBase // the goal is to cover all supported data types and their combinations waitForSchemaReady(1L, TimeUnit.MINUTES); createTestKeyspace(); - createTestUdt("numbers", "ti tinyint, " - + "si smallint, " - + "bi bigint, " - + "vi varint, " - + "sf float, " - + "df double, " - + "de decimal"); - createTestUdt("datetime", "dd date, " - + "ts timestamp, " - + "tt time"); - createTestUdt("strings", "tu timeuuid, " - + "ru uuid, " - + "ip inet, " - + "as ascii, " - + "us text, " - + "vc varchar"); - createTestUdt("collections", "t tuple<int, ascii>, " - + "s set<ascii>, " - + "l frozen<list<ascii>>, " - + "m map<ascii, frozen<map<ascii, int>>>"); + createTestUdt("numbers", "ti tinyint, " + + "si smallint, " + + "bi bigint, " + + "vi varint, " + + "sf float, " + + "df double, " + + "de decimal"); + createTestUdt("datetime", "dd date, " + + "ts timestamp, " + + "tt time"); + createTestUdt("strings", "tu timeuuid, " + + "ru uuid, " + + "ip inet, " + + "as ascii, " + + "us text, " + + "vc varchar"); + createTestUdt("collections", "t tuple<int, ascii>, " + + "s set<ascii>, " + + "l frozen<list<ascii>>, " + + "m map<ascii, frozen<map<ascii, int>>>"); createTestUdt("types", "b blob"); createTestUdt("other", "t frozen<types>"); - createTestTable("CREATE TABLE " + TEST_KEYSPACE + "." + TEST_TABLE_PREFIX + " (" - + "b boolean PRIMARY KEY, " - + "n numbers, " - + "t frozen<datetime>, " - + "s strings, " - + "c frozen<collections>, " - + "o other)" + WITH_COMPACTION_DISABLED + ";"); + createTestTable("CREATE TABLE " + TEST_KEYSPACE + "." + TEST_TABLE_PREFIX + " (" + + "b boolean PRIMARY KEY, " + + "n numbers, " + + "t frozen<datetime>, " + + "s strings, " + + "c frozen<collections>, " + + "o other)" + WITH_COMPACTION_DISABLED + ";"); // First, ensure the returned schema matches the reference one // (while ignoring name suffixes and whitespace characters) JsonEmitter emitter = new JsonEmitter(); try (Session session = maybeGetSession()) { - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(session.getCluster()); + new SchemaReporter(IDENTIFIERS, () -> emitter).process(session.getCluster()); } String actualJson = normalizeNames(emitter.content()); String expectedJson = IOUtils.readFully("/datahub/integration_test.json"); - assertThat(actualJson) - .isEqualToNormalizingWhitespace(expectedJson); + assertThat(actualJson).isEqualToNormalizingWhitespace(expectedJson); // Finally, make sure the returned schema produces the same tree of // DataHub objects after having been normalized and deserialized DataList actualData = CODEC.readList(new StringReader(actualJson)); DataList expectedData = CODEC.readList(new StringReader(expectedJson)); - assertThat(actualData) - .isEqualTo(expectedData); + assertThat(actualData).isEqualTo(expectedData); } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java index 920aa9f6..7a7b0f2f 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java @@ -41,7 +41,7 @@ import static org.mockito.Mockito.when; /** * Unit tests for {@link SchemaReporter} */ -@SuppressWarnings("try") +@SuppressWarnings("resource") final class SchemaReporterTest { private static final IdentifiersProvider IDENTIFIERS = new TestIdentifiers(); @@ -56,8 +56,7 @@ final class SchemaReporterTest when(metadata.getKeyspaces()).thenReturn(Collections.emptyList()); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/empty_cluster.json"); @@ -78,8 +77,7 @@ final class SchemaReporterTest when(keyspace.getTables()).thenReturn(Collections.emptyList()); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/empty_keyspace.json"); @@ -107,8 +105,7 @@ final class SchemaReporterTest when(options.getComment()).thenReturn("table comment"); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/empty_table.json"); @@ -187,8 +184,7 @@ final class SchemaReporterTest when(c8.getType()).thenReturn(DataType.map(DataType.timestamp(), DataType.inet(), false)); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/primitive_types.json"); @@ -261,8 +257,7 @@ final class SchemaReporterTest when(udt2c2.getType()).thenReturn(DataType.cboolean()); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/user_types.json"); diff --git a/server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java new file mode 100644 index 00000000..333655b5 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Metadata; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import com.linkedin.mxe.MetadataChangeProposal; +import datahub.client.Callback; +import datahub.client.MetadataWriteResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.TestResourceReaper; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.datahub.EmitterFactory; +import org.apache.cassandra.sidecar.datahub.JsonEmitter; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ReportSchemaHandler} + */ +@ExtendWith(VertxExtension.class) +final class ReportSchemaHandlerTest +{ + private static final String CLUSTER = "cluster"; + private static final String DIRECTORY = "/tmp"; + private static final int IDENTIFIER = 42; + private static final String LOCALHOST = "127.0.0.1"; + private static final int PORT = 9042; + private static final String ENDPOINT = "/api/v1/datahub/schemas"; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + + private static final class ThrowingEmitter extends JsonEmitter + { + @Override + @NotNull + public synchronized Future<MetadataWriteResponse> emit(@NotNull MetadataChangeProposal proposal, + @Nullable Callback callback) throws IOException + { + throw new IOException(); + } + } + + private final class ReportSchemaHandlerTestModule extends AbstractModule + { + @Provides + @Singleton + @NotNull + public InstancesMetadata instancesMetadata() + { + Metadata metadata = mock(Metadata.class); + when(metadata.getKeyspaces()).thenReturn(Collections.emptyList()); + + StorageOperations operations = mock(StorageOperations.class); + when(operations.clusterName()).thenReturn(CLUSTER); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + when(delegate.storageOperations()).thenReturn(operations); + when(delegate.metadata()).thenReturn(metadata); + + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.stagingDir()).thenReturn(DIRECTORY); + when(instanceMetadata.id()).thenReturn(IDENTIFIER); + when(instanceMetadata.host()).thenReturn(LOCALHOST); + when(instanceMetadata.port()).thenReturn(PORT); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata instances = mock(InstancesMetadata.class); + when(instances.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(instances.instanceFromId(IDENTIFIER)).thenReturn(instanceMetadata); + when(instances.instanceFromHost(LOCALHOST)).thenReturn(instanceMetadata); + return instances; + } + + @Provides + @Singleton + @NotNull + public EmitterFactory emitterFactory() + { + return () -> emitter; + } + } + + private final Injector injector = Guice.createInjector(Modules.override(new MainModule()) + .with(Modules.override(new TestModule()) + .with(new ReportSchemaHandlerTestModule()))); + private WebClient client; + private Server server; + private JsonEmitter emitter; + + @BeforeEach + void before() throws InterruptedException + { + VertxTestContext context = new VertxTestContext(); + server = injector.getInstance(Server.class); + server.start() + .onSuccess(result -> context.completeNow()) + .onFailure(context::failNow); + + client = WebClient.create(injector.getInstance(Vertx.class)); + + context.awaitCompletion(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + TestResourceReaper.create() + .with(server) + .with(client) + .close(); + } + + @Test + void testSuccess(@NotNull VertxTestContext context) throws IOException + { + String expected = IOUtils.readFully("/datahub/empty_cluster.json"); + emitter = new JsonEmitter(); + assertThat(emitter.content().length()).isLessThanOrEqualTo(1); + + client.put(server.actualPort(), LOCALHOST, ENDPOINT) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(emitter.content()).isEqualTo(expected); + context.completeNow(); + })); + } + + @Test + void testFailure(@NotNull VertxTestContext context) + { + String expected = "[\n]"; + emitter = new ThrowingEmitter(); + assertThat(emitter.content().length()).isLessThanOrEqualTo(1); + + client.put(server.actualPort(), LOCALHOST, ENDPOINT) + .expect(ResponsePredicate.SC_INTERNAL_SERVER_ERROR) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); + assertThat(emitter.content()).isEqualTo(expected); + context.completeNow(); + })); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org