This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 041c33bb CASSSIDECAR-203: Created Endpoint that Triggers an Immediate 
Schema Report (#198)
041c33bb is described below

commit 041c33bbc4dc2821a138a3c9d9fc7e4c77ef430e
Author: Yuriy Semchyshyn <5...@users.noreply.github.com>
AuthorDate: Tue Mar 11 15:34:00 2025 -0500

    CASSSIDECAR-203: Created Endpoint that Triggers an Immediate Schema Report 
(#198)
    
    Patch by Yuriy Semchyshyn; reviewed by Saranya Krishnakumar, Yifan Cai, 
Francisco Guerrero for CASSSIDECAR-203
---
 CHANGES.txt                                        |   1 +
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   5 +-
 .../common/request/ReportSchemaRequest.java        |  41 +++++
 .../cassandra/sidecar/client/RequestContext.java   |  25 ++-
 .../cassandra/sidecar/client/SidecarClient.java    |  42 +++--
 .../sidecar/client/SidecarClientTest.java          |  35 +++-
 .../acl/authorization/BasicPermissions.java        |   3 +
 .../cassandra/sidecar/datahub/SchemaReporter.java  |  43 +++--
 .../sidecar/routes/ReportSchemaHandler.java        | 104 +++++++++++
 .../cassandra/sidecar/server/MainModule.java       |  17 +-
 .../datahub/SchemaReporterIntegrationTest.java     |  71 ++++----
 .../sidecar/datahub/SchemaReporterTest.java        |  17 +-
 .../sidecar/routes/ReportSchemaHandlerTest.java    | 191 +++++++++++++++++++++
 13 files changed, 508 insertions(+), 87 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c9228296..d8f4c061 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.1.0
 -----
+ * Create Endpoint that Triggers an Immediate Schema Report (CASSSIDECAR-203)
  * Add JWT Authentication support in Sidecar (CASSSIDECAR-160)
  * Add Token Ring Peer Health monitor (CASSSIDECAR-206)
  * Config APIs for storing CDC/Kafka configs for CDC feature (CASSSIDECAR-211)
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index faff4558..c38ee551 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -121,13 +121,16 @@ public final class ApiEndpointsV1
     public static final String LIST_CDC_SEGMENTS_ROUTE = API_V1 + CDC_PATH + 
"/segments";
     public static final String STREAM_CDC_SEGMENTS_ROUTE = 
LIST_CDC_SEGMENTS_ROUTE + "/" + SEGMENT_PATH_PARAM;
 
+    // Schema Reporting
+    private static final String REPORT_SCHEMA = "/datahub/schemas";
+    public static final String REPORT_SCHEMA_ROUTE = API_V1 + REPORT_SCHEMA;
+
     public static final String SERVICES_PATH = "/services";
     public static final String SERVICE_PARAM = ":service";
     public static final String CONFIG = "/config";
     public static final String SERVICE_CONFIG_ROUTE = API_V1 + SERVICES_PATH + 
"/" + SERVICE_PARAM + CONFIG;
     public static final String SERVICES_CONFIG_ROUTE = API_V1 + SERVICES_PATH;
 
-
     public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + 
CASSANDRA + "/stats/connected-clients";
 
     public static final String OPERATIONAL_JOBS = "/operational-jobs";
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ReportSchemaRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ReportSchemaRequest.java
new file mode 100644
index 00000000..91b5a7fc
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ReportSchemaRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.HealthResponse;
+
+/**
+ * A request to trigger an immediate, synchronous schema conversion
+ * and report regardless of the periodic task schedule or status
+ */
+public class ReportSchemaRequest extends JsonRequest<HealthResponse>
+{
+    public ReportSchemaRequest()
+    {
+        super(ApiEndpointsV1.REPORT_SCHEMA_ROUTE);
+    }
+
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.PUT;
+    }
+}
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 4fde93a9..f782077d 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -43,6 +43,7 @@ import 
org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest;
 import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest;
 import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest;
 import org.apache.cassandra.sidecar.common.request.OperationalJobRequest;
+import org.apache.cassandra.sidecar.common.request.ReportSchemaRequest;
 import org.apache.cassandra.sidecar.common.request.Request;
 import org.apache.cassandra.sidecar.common.request.RingRequest;
 import org.apache.cassandra.sidecar.common.request.SSTableComponentRequest;
@@ -84,7 +85,7 @@ public class RequestContext
     protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST = 
new NodeDecommissionRequest();
 
     protected static final StreamStatsRequest STREAM_STATS_REQUEST = new 
StreamStatsRequest();
-    protected static final RetryPolicy DEFAULT_RETRY_POLICY = new 
NoRetryPolicy();
+    protected static final RetryPolicy DEFAULT_NO_RETRY_POLICY = new 
NoRetryPolicy();
     protected static final RetryPolicy 
DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY =
     new ExponentialBackoffRetryPolicy(10, 500L, 60_000L);
 
@@ -140,7 +141,7 @@ public class RequestContext
     {
         private InstanceSelectionPolicy instanceSelectionPolicy;
         private Request request;
-        private RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY;
+        private RetryPolicy retryPolicy = DEFAULT_NO_RETRY_POLICY;
         private final Map<String, String> customHeaders;
 
         public Builder()
@@ -498,6 +499,16 @@ public class RequestContext
             return request(new UploadSSTableRequest(keyspace, tableName, 
uploadId, component, digest, filename));
         }
 
+        /**
+         * Sets the {@code request} to be a new instance of a {@link 
ReportSchemaRequest}
+         *
+         * @return this {@link Builder} for method chaining
+         */
+        public Builder reportSchemaRequest()
+        {
+            return request(new ReportSchemaRequest());
+        }
+
         /**
          * Sets the {@code request} to be a {@link 
ConnectedClientStatsRequest} and returns a reference to this Builder
          * enabling method chaining.
@@ -564,6 +575,16 @@ public class RequestContext
             return request(STREAM_STATS_REQUEST);
         }
 
+        /**
+         * Sets the {@code retryPolicy} to be an instance of {@link 
NoRetryPolicy}
+         *
+         * @return this {@link Builder} for method chaining
+         */
+        public Builder noRetryPolicy()
+        {
+            return retryPolicy(DEFAULT_NO_RETRY_POLICY);
+        }
+
         /**
          * Sets the {@code retryPolicy} to be an
          * {@link 
org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy} 
configured with
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 6fe2f9c6..db1e2c85 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -540,9 +539,9 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
     public CompletableFuture<ListCdcSegmentsResponse> 
listCdcSegments(SidecarInstance sidecarInstance)
     {
         return executor.executeRequestAsync(requestBuilder()
-                       .singleInstanceSelectionPolicy(sidecarInstance)
-                       .request(new ListCdcSegmentsRequest())
-                       .build());
+                                            
.singleInstanceSelectionPolicy(sidecarInstance)
+                                            .request(new 
ListCdcSegmentsRequest())
+                                            .build());
     }
 
     /**
@@ -562,9 +561,26 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                   StreamConsumer streamConsumer)
     {
         executor.streamRequest(requestBuilder()
-                .singleInstanceSelectionPolicy(sidecarInstance)
-                .request(new StreamCdcSegmentRequest(segment, range))
-                .build(), streamConsumer);
+                               .singleInstanceSelectionPolicy(sidecarInstance)
+                               .request(new StreamCdcSegmentRequest(segment, 
range))
+                               .build(), streamConsumer);
+    }
+
+    /**
+     * Sends a request to trigger an immediate, synchronous schema
+     * conversion and report on the specified instance of the Sidecar
+     * regardless of the periodic task schedule or status
+     *
+     * @param instance the {@link SidecarInstance} to receive the request
+     * @return a {@link CompletableFuture} for the request
+     */
+    public <T> CompletableFuture<T> reportSchema(SidecarInstance instance)
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            
.singleInstanceSelectionPolicy(instance)
+                                            .reportSchemaRequest()
+                                            .noRetryPolicy()  // {@link 
NoRetryPolicy} is the preferred behavior here
+                                            .build());
     }
 
     /**
@@ -576,8 +592,8 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
     public CompletableFuture<AllServicesConfigPayload> allServicesConfig()
     {
         return executor.executeRequestAsync(requestBuilder()
-                       .request(new AllServicesConfigRequest())
-                       .build());
+                                            .request(new 
AllServicesConfigRequest())
+                                            .build());
     }
 
     /**
@@ -590,8 +606,8 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
     public CompletableFuture<UpdateCdcServiceConfigPayload> 
updateCdcServiceConfig(Service service, Map<String, String> config)
     {
         return executor.executeRequestAsync(requestBuilder()
-                       .request(new UpdateServiceConfigRequest(service, new 
UpdateCdcServiceConfigPayload(config)))
-                       .build());
+                                            .request(new 
UpdateServiceConfigRequest(service, new UpdateCdcServiceConfigPayload(config)))
+                                            .build());
     }
 
     /**
@@ -602,8 +618,8 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
     public CompletableFuture<Void> deleteCdcServiceConfig(Service service)
     {
         return executor.executeRequestAsync(requestBuilder()
-                       .request(new DeleteServiceConfigRequest(service))
-                       .build());
+                                            .request(new 
DeleteServiceConfigRequest(service))
+                                            .build());
     }
 
     /**
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 6ca140c9..fad89a38 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -114,6 +113,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatException;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static 
org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
 
@@ -1607,7 +1607,7 @@ abstract class SidecarClientTest
     public void testListCdcSegments() throws ExecutionException, 
InterruptedException, JsonProcessingException
     {
         List<CdcSegmentInfo> segments = Arrays.asList(new 
CdcSegmentInfo("commit-log1", 100, 100, true, 1732148713725L),
-                new CdcSegmentInfo("commit-log2", 100, 10, false, 
1732148713725L));
+                                                      new 
CdcSegmentInfo("commit-log2", 100, 10, false, 1732148713725L));
         ListCdcSegmentsResponse listSegmentsResponse = new 
ListCdcSegmentsResponse("localhost", 9043, segments);
         ObjectMapper mapper = new ObjectMapper();
 
@@ -1673,6 +1673,35 @@ abstract class SidecarClientTest
         assertThat(new String(baos.toByteArray(), 
StandardCharsets.UTF_8)).isEqualTo("Test Content");
     }
 
+    @Test
+    public void testReportSchemaSuccess()
+    {
+        MockResponse response = new MockResponse().setResponseCode(OK.code())
+                                                  
.setBody("{\"status\":\"OK\"}");
+
+        enqueue(response);
+
+        SidecarInstance instance = instances.get(0);
+
+        assertThatNoException().isThrownBy(() -> 
client.reportSchema(instance).get());
+    }
+
+    @Test
+    public void testReportSchemaFailure()
+    {
+        MockResponse response = new 
MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code())
+                                                  
.setBody(INTERNAL_SERVER_ERROR.reasonPhrase());
+
+        enqueue(response);
+
+        SidecarInstance instance = instances.get(0);
+
+        assertThatThrownBy(() -> 
client.reportSchema(instance).get()).isExactlyInstanceOf(ExecutionException.class)
+                                                                     
.hasCauseInstanceOf(RetriesExhaustedException.class)
+                                                                     
.hasMessageContaining(Integer.toString(INTERNAL_SERVER_ERROR.code()))
+                                                                     
.hasMessageContaining(INTERNAL_SERVER_ERROR.reasonPhrase());
+    }
+
     @Test
     public void testAllServiceSuccessTests() throws IOException, 
ExecutionException, InterruptedException
     {
@@ -1723,7 +1752,7 @@ abstract class SidecarClientTest
         enqueue(response);
         client.deleteCdcServiceConfig(Service.CDC).get();
         
validateResponseServed(ApiEndpointsV1.SERVICE_CONFIG_ROUTE.replaceAll(ApiEndpointsV1.SERVICE_PARAM,
 "cdc"),
-                request -> 
assertThat(request.getMethod()).isEqualTo("DELETE"));
+                               request -> 
assertThat(request.getMethod()).isEqualTo("DELETE"));
     }
 
     @Test
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
index 240cec81..76fb723b 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
@@ -63,6 +63,9 @@ public class BasicPermissions
     public static final Permission READ_OPERATIONAL_JOB = new 
DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE);
     public static final Permission DECOMMISSION_NODE = new 
DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE);
 
+    // Permissions related to Schema Reporting
+    public static final Permission REPORT_SCHEMA = new 
DomainAwarePermission("SCHEMA:PUBLISH", CLUSTER_SCOPE);
+
     // cassandra cluster related permissions
     public static final Permission READ_SCHEMA = new 
DomainAwarePermission("SCHEMA:READ", CLUSTER_SCOPE);
     public static final Permission READ_SCHEMA_KEYSPACE_SCOPED = new 
DomainAwarePermission("SCHEMA:READ", KEYSPACE_SCOPE);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java 
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
index ab2ddb54..4ad3e3f3 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java
@@ -113,14 +113,23 @@ public class SchemaReporter
     /**
      * Public method for converting and reporting the Cassandra schema
      *
-     * @param cluster a {@link Cluster} to extract Cassandra schema from
+     * @param cluster the {@link Cluster} to extract Cassandra schema from
      */
     public void process(@NotNull Cluster cluster)
+    {
+        process(cluster.getMetadata());
+    }
+
+    /**
+     * Public method for converting and reporting the Cassandra schema
+     *
+     * @param metadata the {@link Metadata} to extract Cassandra schema from
+     */
+    public void process(@NotNull Metadata metadata)
     {
         try (Emitter emitter = emitterFactory.emitter())
         {
-            stream(cluster.getMetadata())
-                    .forEach(ThrowableUtils.consumer(emitter::emit));
+            stream(metadata).forEach(ThrowableUtils.consumer(emitter::emit));
         }
         catch (Exception exception)
         {
@@ -140,11 +149,12 @@ public class SchemaReporter
     protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> 
stream(@NotNull Metadata metadata)
     {
         return Streams.concat(
-                clusterConverters.stream()
-                        .map(ThrowableUtils.function(converter -> 
converter.convert(metadata))),
-                metadata.getKeyspaces().stream()
-                        .filter(this::neitherVirtualNorSystem)
-                        .flatMap(this::stream));
+        clusterConverters.stream()
+                         .map(ThrowableUtils.function(converter -> 
converter.convert(metadata))),
+        metadata.getKeyspaces()
+                .stream()
+                .filter(this::neitherVirtualNorSystem)
+                .flatMap(this::stream));
     }
 
     /**
@@ -159,10 +169,11 @@ public class SchemaReporter
     protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> 
stream(@NotNull KeyspaceMetadata keyspace)
     {
         return Streams.concat(
-                keyspaceConverters.stream()
-                        .map(ThrowableUtils.function(converter -> 
converter.convert(keyspace))),
-                keyspace.getTables().stream()
-                        .flatMap(this::stream));
+        keyspaceConverters.stream()
+                          .map(ThrowableUtils.function(converter -> 
converter.convert(keyspace))),
+        keyspace.getTables()
+                .stream()
+                .flatMap(this::stream));
     }
 
     /**
@@ -176,7 +187,7 @@ public class SchemaReporter
     protected Stream<MetadataChangeProposalWrapper<? extends RecordTemplate>> 
stream(@NotNull TableMetadata table)
     {
         return tableConverters.stream()
-                .map(ThrowableUtils.function(converter -> 
converter.convert(table)));
+                              .map(ThrowableUtils.function(converter -> 
converter.convert(table)));
     }
 
     /**
@@ -195,8 +206,8 @@ public class SchemaReporter
         }
 
         String name = keyspace.getName();
-        return !name.equals("system")
-            && !name.startsWith("system_")
-            && !name.equals("sidecar_internal");
+        return !name.equals("system") &&
+               !name.startsWith("system_") &&
+               !name.equals("sidecar_internal");
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java
new file mode 100644
index 00000000..5fe5d31a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.datastax.driver.core.Metadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.datahub.SchemaReporter;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An implementation of {@link AbstractHandler} used to trigger an immediate,
+ * synchronous conversion and report of the current schema
+ */
+@Singleton
+public class ReportSchemaHandler extends AbstractHandler<Void> implements 
AccessProtected
+{
+    @NotNull
+    private final SchemaReporter schemaReporter;
+
+    /**
+     * Constructs a new instance of {@link ReportSchemaHandler} using the 
provided instances
+     * of {@link InstanceMetadataFetcher}, {@link ExecutorPools}, and {@link 
SchemaReporter}
+     *
+     * @param metadata the metadata fetcher
+     * @param executor executor pools for blocking executions
+     * @param reporter schema reporter to use for the conversion
+     */
+    @Inject
+    public ReportSchemaHandler(@NotNull InstanceMetadataFetcher metadata,
+                               @NotNull ExecutorPools executor,
+                               @NotNull SchemaReporter reporter)
+    {
+        super(metadata, executor, null);
+
+        schemaReporter = reporter;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @NotNull
+    public Set<Authorization> requiredAuthorizations()
+    {
+        return 
Collections.singleton(BasicPermissions.REPORT_SCHEMA.toAuthorization());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @Nullable
+    protected Void extractParamsOrThrow(@NotNull RoutingContext context)
+    {
+        return null;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void handleInternal(@NotNull RoutingContext context,
+                                  @NotNull HttpServerRequest http,
+                                  @NotNull String host,
+                                  @NotNull SocketAddress address,
+                                  @Nullable Void request)
+    {
+        Metadata metadata = 
metadataFetcher.callOnFirstAvailableInstance(instance -> 
instance.delegate().metadata());
+
+        executorPools.service()
+                     .runBlocking(() -> schemaReporter.process(metadata))
+                     .onSuccess(ignored ->  context.json(MainModule.OK_STATUS))
+                     .onFailure(throwable -> processFailure(throwable, 
context, host, address, request));
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index c772cdcd..71d2294e 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -141,6 +141,7 @@ import 
org.apache.cassandra.sidecar.routes.KeyspaceSchemaHandler;
 import org.apache.cassandra.sidecar.routes.ListOperationalJobsHandler;
 import org.apache.cassandra.sidecar.routes.NodeDecommissionHandler;
 import org.apache.cassandra.sidecar.routes.OperationalJobHandler;
+import org.apache.cassandra.sidecar.routes.ReportSchemaHandler;
 import org.apache.cassandra.sidecar.routes.RingHandler;
 import org.apache.cassandra.sidecar.routes.RoutingOrder;
 import org.apache.cassandra.sidecar.routes.SchemaHandler;
@@ -181,7 +182,6 @@ import org.jetbrains.annotations.NotNull;
 
 import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.API_V1_ALL_ROUTES;
 import static 
org.apache.cassandra.sidecar.common.server.utils.ByteUtils.bytesToHumanReadableBinaryPrefix;
-import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY;
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
 import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
 
@@ -364,6 +364,7 @@ public class MainModule extends AbstractModule
                               SSTableCleanupHandler ssTableCleanupHandler,
                               StreamCdcSegmentHandler streamCdcSegmentHandler,
                               ListCdcDirHandler listCdcDirHandler,
+                              ReportSchemaHandler reportSchemaHandler,
                               RestoreRequestValidationHandler 
validateRestoreJobRequest,
                               DiskSpaceProtectionHandler diskSpaceProtection,
                               ValidateTableExistenceHandler 
validateTableExistence,
@@ -631,6 +632,14 @@ public class MainModule extends AbstractModule
                                     .handler(streamCdcSegmentHandler)
                                     .build();
 
+        // Schema Reporting
+        protectedRouteBuilderFactory.get()
+                                    .router(router)
+                                    .method(HttpMethod.PUT)
+                                    
.endpoint(ApiEndpointsV1.REPORT_SCHEMA_ROUTE)
+                                    .handler(reportSchemaHandler)
+                                    .build();
+
         
protectedRouteBuilderFactory.get().router(router).method(HttpMethod.PUT)
                                     
.endpoint(ApiEndpointsV1.SERVICE_CONFIG_ROUTE)
                                     .setBodyHandler(true)
@@ -959,9 +968,8 @@ public class MainModule extends AbstractModule
                                        ignored -> {
                                            
periodicTaskExecutor.schedule(clusterLeaseClaimTask);
                                            
periodicTaskExecutor.schedule(sidecarPeerHealthMonitorTask);
+                                           
periodicTaskExecutor.schedule(schemaReportingTask);
                                        });
-        vertx.eventBus().localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(),
-                                       message -> 
periodicTaskExecutor.schedule(schemaReportingTask));
         return periodicTaskExecutor;
     }
 
@@ -975,7 +983,8 @@ public class MainModule extends AbstractModule
             @NotNull
             protected String initialize()
             {
-                return fetcher.callOnFirstAvailableInstance(i -> 
i.delegate().storageOperations().clusterName());
+                return fetcher.callOnFirstAvailableInstance(instance ->
+                                                            
instance.delegate().storageOperations().clusterName());
             }
         };
 
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
index 08c31097..9650e87b 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java
@@ -36,7 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /**
  * Integration test for {@link SchemaReporter}
  */
-@SuppressWarnings({"try", "unused"})
+@SuppressWarnings("resource")
 final class SchemaReporterIntegrationTest extends IntegrationTestBase
 {
     private static final IdentifiersProvider IDENTIFIERS = new 
TestIdentifiers();
@@ -50,9 +50,9 @@ final class SchemaReporterIntegrationTest extends 
IntegrationTestBase
     private static String normalizeNames(@NotNull String schema)
     {
         return schema.replaceAll("(?is)(?<=\\b(" + DATA_CENTER_PREFIX + "|"
-                                                 + TEST_CLUSTER_PREFIX + "|"
-                                                 + TEST_KEYSPACE + "|"
-                                                 + TEST_TABLE_PREFIX + 
"))\\d+\\b", "");
+                                 + TEST_CLUSTER_PREFIX + "|"
+                                 + TEST_KEYSPACE + "|"
+                                 + TEST_TABLE_PREFIX + "))\\d+\\b", "");
     }
 
     @CassandraIntegrationTest
@@ -62,56 +62,53 @@ final class SchemaReporterIntegrationTest extends 
IntegrationTestBase
         // the goal is to cover all supported data types and their combinations
         waitForSchemaReady(1L, TimeUnit.MINUTES);
         createTestKeyspace();
-        createTestUdt("numbers",     "ti tinyint, "
-                                   + "si smallint, "
-                                   + "bi bigint, "
-                                   + "vi varint, "
-                                   + "sf float, "
-                                   + "df double, "
-                                   + "de decimal");
-        createTestUdt("datetime",    "dd date, "
-                                   + "ts timestamp, "
-                                   + "tt time");
-        createTestUdt("strings",     "tu timeuuid, "
-                                   + "ru uuid, "
-                                   + "ip inet, "
-                                   + "as ascii, "
-                                   + "us text, "
-                                   + "vc varchar");
-        createTestUdt("collections", "t tuple<int, ascii>, "
-                                   + "s set<ascii>, "
-                                   + "l frozen<list<ascii>>, "
-                                   + "m map<ascii, frozen<map<ascii, int>>>");
+        createTestUdt("numbers",     "ti tinyint, " +
+                                     "si smallint, " +
+                                     "bi bigint, " +
+                                     "vi varint, " +
+                                     "sf float, " +
+                                     "df double, " +
+                                     "de decimal");
+        createTestUdt("datetime",    "dd date, " +
+                                     "ts timestamp, " +
+                                     "tt time");
+        createTestUdt("strings",     "tu timeuuid, " +
+                                     "ru uuid, " +
+                                     "ip inet, " +
+                                     "as ascii, " +
+                                     "us text, " +
+                                     "vc varchar");
+        createTestUdt("collections", "t tuple<int, ascii>, " +
+                                     "s set<ascii>, " +
+                                     "l frozen<list<ascii>>, " +
+                                     "m map<ascii, frozen<map<ascii, int>>>");
         createTestUdt("types",       "b blob");
         createTestUdt("other",       "t frozen<types>");
-        createTestTable("CREATE TABLE " + TEST_KEYSPACE + "." + 
TEST_TABLE_PREFIX + " ("
-                      + "b boolean PRIMARY KEY, "
-                      + "n numbers, "
-                      + "t frozen<datetime>, "
-                      + "s strings, "
-                      + "c frozen<collections>, "
-                      + "o other)" + WITH_COMPACTION_DISABLED + ";");
+        createTestTable("CREATE TABLE " + TEST_KEYSPACE + "." + 
TEST_TABLE_PREFIX + " (" +
+                        "b boolean PRIMARY KEY, " +
+                        "n numbers, " +
+                        "t frozen<datetime>, " +
+                        "s strings, " +
+                        "c frozen<collections>, " +
+                        "o other)" + WITH_COMPACTION_DISABLED + ";");
 
         // First, ensure the returned schema matches the reference one
         // (while ignoring name suffixes and whitespace characters)
         JsonEmitter emitter = new JsonEmitter();
         try (Session session = maybeGetSession())
         {
-            new SchemaReporter(IDENTIFIERS, () -> emitter)
-                    .process(session.getCluster());
+            new SchemaReporter(IDENTIFIERS, () -> 
emitter).process(session.getCluster());
         }
         String   actualJson = normalizeNames(emitter.content());
         String expectedJson = 
IOUtils.readFully("/datahub/integration_test.json");
 
-        assertThat(actualJson)
-                .isEqualToNormalizingWhitespace(expectedJson);
+        assertThat(actualJson).isEqualToNormalizingWhitespace(expectedJson);
 
         // Finally, make sure the returned schema produces the same tree of
         // DataHub objects after having been normalized and deserialized
         DataList   actualData = CODEC.readList(new StringReader(actualJson));
         DataList expectedData = CODEC.readList(new StringReader(expectedJson));
 
-        assertThat(actualData)
-                .isEqualTo(expectedData);
+        assertThat(actualData).isEqualTo(expectedData);
     }
 }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
index 920aa9f6..7a7b0f2f 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java
@@ -41,7 +41,7 @@ import static org.mockito.Mockito.when;
 /**
  * Unit tests for {@link SchemaReporter}
  */
-@SuppressWarnings("try")
+@SuppressWarnings("resource")
 final class SchemaReporterTest
 {
     private static final IdentifiersProvider IDENTIFIERS = new 
TestIdentifiers();
@@ -56,8 +56,7 @@ final class SchemaReporterTest
         when(metadata.getKeyspaces()).thenReturn(Collections.emptyList());
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter)
-                .process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/empty_cluster.json");
@@ -78,8 +77,7 @@ final class SchemaReporterTest
         when(keyspace.getTables()).thenReturn(Collections.emptyList());
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter)
-                .process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/empty_keyspace.json");
@@ -107,8 +105,7 @@ final class SchemaReporterTest
         when(options.getComment()).thenReturn("table comment");
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter)
-                .process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/empty_table.json");
@@ -187,8 +184,7 @@ final class SchemaReporterTest
         when(c8.getType()).thenReturn(DataType.map(DataType.timestamp(), 
DataType.inet(), false));
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter)
-                .process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/primitive_types.json");
@@ -261,8 +257,7 @@ final class SchemaReporterTest
         when(udt2c2.getType()).thenReturn(DataType.cboolean());
 
         JsonEmitter emitter = new JsonEmitter();
-        new SchemaReporter(IDENTIFIERS, () -> emitter)
-                .process(cluster);
+        new SchemaReporter(IDENTIFIERS, () -> emitter).process(cluster);
 
         String actual = emitter.content();
         String expected = IOUtils.readFully("/datahub/user_types.json");
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java
new file mode 100644
index 00000000..333655b5
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Metadata;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import com.linkedin.mxe.MetadataChangeProposal;
+import datahub.client.Callback;
+import datahub.client.MetadataWriteResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.TestResourceReaper;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.utils.IOUtils;
+import org.apache.cassandra.sidecar.datahub.EmitterFactory;
+import org.apache.cassandra.sidecar.datahub.JsonEmitter;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link ReportSchemaHandler}
+ */
+@ExtendWith(VertxExtension.class)
+final class ReportSchemaHandlerTest
+{
+    private static final String CLUSTER = "cluster";
+    private static final String DIRECTORY = "/tmp";
+    private static final int IDENTIFIER = 42;
+    private static final String LOCALHOST = "127.0.0.1";
+    private static final int PORT = 9042;
+    private static final String ENDPOINT = "/api/v1/datahub/schemas";
+    private static final Duration TIMEOUT = Duration.ofSeconds(5);
+
+    private static final class ThrowingEmitter extends JsonEmitter
+    {
+        @Override
+        @NotNull
+        public synchronized Future<MetadataWriteResponse> emit(@NotNull 
MetadataChangeProposal proposal,
+                                                               @Nullable 
Callback callback) throws IOException
+        {
+            throw new IOException();
+        }
+    }
+
+    private final class ReportSchemaHandlerTestModule extends AbstractModule
+    {
+        @Provides
+        @Singleton
+        @NotNull
+        public InstancesMetadata instancesMetadata()
+        {
+            Metadata metadata = mock(Metadata.class);
+            when(metadata.getKeyspaces()).thenReturn(Collections.emptyList());
+
+            StorageOperations operations = mock(StorageOperations.class);
+            when(operations.clusterName()).thenReturn(CLUSTER);
+
+            CassandraAdapterDelegate delegate = 
mock(CassandraAdapterDelegate.class);
+            when(delegate.storageOperations()).thenReturn(operations);
+            when(delegate.metadata()).thenReturn(metadata);
+
+            InstanceMetadata instanceMetadata = mock(InstanceMetadata.class);
+            when(instanceMetadata.stagingDir()).thenReturn(DIRECTORY);
+            when(instanceMetadata.id()).thenReturn(IDENTIFIER);
+            when(instanceMetadata.host()).thenReturn(LOCALHOST);
+            when(instanceMetadata.port()).thenReturn(PORT);
+            when(instanceMetadata.delegate()).thenReturn(delegate);
+
+            InstancesMetadata instances = mock(InstancesMetadata.class);
+            
when(instances.instances()).thenReturn(Collections.singletonList(instanceMetadata));
+            
when(instances.instanceFromId(IDENTIFIER)).thenReturn(instanceMetadata);
+            
when(instances.instanceFromHost(LOCALHOST)).thenReturn(instanceMetadata);
+            return instances;
+        }
+
+        @Provides
+        @Singleton
+        @NotNull
+        public EmitterFactory emitterFactory()
+        {
+            return () -> emitter;
+        }
+    }
+
+    private final Injector injector = 
Guice.createInjector(Modules.override(new MainModule())
+                                                                  
.with(Modules.override(new TestModule())
+                                                                               
.with(new ReportSchemaHandlerTestModule())));
+    private WebClient client;
+    private Server server;
+    private JsonEmitter emitter;
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        VertxTestContext context = new VertxTestContext();
+        server = injector.getInstance(Server.class);
+        server.start()
+              .onSuccess(result -> context.completeNow())
+              .onFailure(context::failNow);
+
+        client = WebClient.create(injector.getInstance(Vertx.class));
+
+        context.awaitCompletion(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        TestResourceReaper.create()
+                          .with(server)
+                          .with(client)
+                          .close();
+    }
+
+    @Test
+    void testSuccess(@NotNull VertxTestContext context) throws IOException
+    {
+        String expected = IOUtils.readFully("/datahub/empty_cluster.json");
+        emitter = new JsonEmitter();
+        assertThat(emitter.content().length()).isLessThanOrEqualTo(1);
+
+        client.put(server.actualPort(), LOCALHOST, ENDPOINT)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+                  assertThat(emitter.content()).isEqualTo(expected);
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testFailure(@NotNull VertxTestContext context)
+    {
+        String expected = "[\n]";
+        emitter = new ThrowingEmitter();
+        assertThat(emitter.content().length()).isLessThanOrEqualTo(1);
+
+        client.put(server.actualPort(), LOCALHOST, ENDPOINT)
+              .expect(ResponsePredicate.SC_INTERNAL_SERVER_ERROR)
+              .send(context.succeeding(response -> {
+                  
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
+                  assertThat(emitter.content()).isEqualTo(expected);
+                  context.completeNow();
+              }));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to