This is an automated email from the ASF dual-hosted git repository.
absurdfarce pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git
The following commit(s) were added to refs/heads/4.x by this push:
new e2c7ad4d1 CASSJAVA-97: Let users inject an ID for each request and
write to the custom payload
e2c7ad4d1 is described below
commit e2c7ad4d11555eeacce6bd436547b403f83eb24f
Author: janehe <[email protected]>
AuthorDate: Tue Apr 15 13:18:34 2025 -0700
CASSJAVA-97: Let users inject an ID for each request and write to the
custom payload
patch by Jane He; reviewed by Abe Ratnofsky and Bret McGuire for CASSJAVA-97
---
core/revapi.json | 5 +
.../api/core/config/DefaultDriverOption.java | 6 +
.../driver/api/core/config/TypedDriverOption.java | 4 +
.../oss/driver/api/core/context/DriverContext.java | 5 +
.../api/core/session/ProgrammaticArguments.java | 17 +++
.../driver/api/core/session/SessionBuilder.java | 22 ++++
.../api/core/tracker/RequestIdGenerator.java | 77 +++++++++++++
.../driver/api/core/tracker/RequestTracker.java | 41 ++++---
.../core/context/DefaultDriverContext.java | 24 ++++
.../internal/core/cql/CqlRequestHandler.java | 59 +++++++---
.../loadbalancing/DefaultLoadBalancingPolicy.java | 4 +-
.../core/tracker/MultiplexingRequestTracker.java | 31 +++--
.../internal/core/tracker/NoopRequestTracker.java | 8 +-
.../internal/core/tracker/RequestLogger.java | 12 +-
.../core/tracker/UuidRequestIdGenerator.java | 43 +++++++
.../core/tracker/W3CContextRequestIdGenerator.java | 67 +++++++++++
core/src/main/resources/reference.conf | 7 ++
.../core/cql/RequestHandlerTestHarness.java | 3 +
.../core/tracker/RequestIdGeneratorTest.java | 80 +++++++++++++
.../driver/core/tracker/RequestIdGeneratorIT.java | 125 +++++++++++++++++++++
.../core/tracker/RequestNodeLoggerExample.java | 8 +-
manual/core/request_id/README.md | 48 ++++++++
22 files changed, 637 insertions(+), 59 deletions(-)
diff --git a/core/revapi.json b/core/revapi.json
index f39c7d4a7..8c707659c 100644
--- a/core/revapi.json
+++ b/core/revapi.json
@@ -7407,6 +7407,11 @@
{
"code": "java.method.varargOverloadsOnlyDifferInVarargParameter",
"justification": "CASSJAVA-102: Migrate revapi config into dedicated
config files, ported from pom.xml"
+ },
+ {
+ "code": "java.method.addedToInterface",
+ "new": "method
java.util.Optional<com.datastax.oss.driver.api.core.tracker.RequestIdGenerator>
com.datastax.oss.driver.api.core.context.DriverContext::getRequestIdGenerator()",
+ "justification": "CASSJAVA-97: Let users inject an ID for each request
and write to the custom payload"
}
]
}
diff --git
a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
index 4e45bf7b1..60c441935 100644
---
a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
+++
b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
@@ -995,6 +995,12 @@ public enum DefaultDriverOption implements DriverOption {
* <p>Value-type: boolean
*/
SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san"),
+ /**
+ * The class of session-wide component that generates request IDs.
+ *
+ * <p>Value-type: {@link String}
+ */
+ REQUEST_ID_GENERATOR_CLASS("advanced.request-id.generator.class"),
/**
* An address to always translate all node addresses to that same proxy
hostname no matter what IP
* address a node has, but still using its native transport port.
diff --git
a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
index aa4e4af12..182753300 100644
---
a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
+++
b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
@@ -281,6 +281,10 @@ public class TypedDriverOption<ValueT> {
new TypedDriverOption<>(
DefaultDriverOption.REQUEST_TRACKER_CLASSES,
GenericType.listOf(String.class));
+ /** The class of a session-wide component that generates request IDs. */
+ public static final TypedDriverOption<String> REQUEST_ID_GENERATOR_CLASS =
+ new TypedDriverOption<>(DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS,
GenericType.STRING);
+
/** Whether to log successful requests. */
public static final TypedDriverOption<Boolean>
REQUEST_LOGGER_SUCCESS_ENABLED =
new TypedDriverOption<>(
diff --git
a/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java
b/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java
index 5b32389e3..6f0afd3df 100644
---
a/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java
+++
b/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java
@@ -33,6 +33,7 @@ import
com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
import com.datastax.oss.driver.api.core.time.TimestampGenerator;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Map;
@@ -139,6 +140,10 @@ public interface DriverContext extends AttachmentPoint {
@NonNull
RequestTracker getRequestTracker();
+ /** @return The driver's request ID generator; never {@code null}. */
+ @NonNull
+ Optional<RequestIdGenerator> getRequestIdGenerator();
+
/** @return The driver's request throttler; never {@code null}. */
@NonNull
RequestThrottler getRequestThrottler();
diff --git
a/core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java
b/core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java
index 4e08bd543..5e10fb4d9 100644
---
a/core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java
+++
b/core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java
@@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import
com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
@@ -59,6 +60,7 @@ public class ProgrammaticArguments {
private final NodeStateListener nodeStateListener;
private final SchemaChangeListener schemaChangeListener;
private final RequestTracker requestTracker;
+ private final RequestIdGenerator requestIdGenerator;
private final Map<String, String> localDatacenters;
private final Map<String, Predicate<Node>> nodeFilters;
private final Map<String, NodeDistanceEvaluator> nodeDistanceEvaluators;
@@ -77,6 +79,7 @@ public class ProgrammaticArguments {
@Nullable NodeStateListener nodeStateListener,
@Nullable SchemaChangeListener schemaChangeListener,
@Nullable RequestTracker requestTracker,
+ @Nullable RequestIdGenerator requestIdGenerator,
@NonNull Map<String, String> localDatacenters,
@NonNull Map<String, Predicate<Node>> nodeFilters,
@NonNull Map<String, NodeDistanceEvaluator> nodeDistanceEvaluators,
@@ -94,6 +97,7 @@ public class ProgrammaticArguments {
this.nodeStateListener = nodeStateListener;
this.schemaChangeListener = schemaChangeListener;
this.requestTracker = requestTracker;
+ this.requestIdGenerator = requestIdGenerator;
this.localDatacenters = localDatacenters;
this.nodeFilters = nodeFilters;
this.nodeDistanceEvaluators = nodeDistanceEvaluators;
@@ -128,6 +132,11 @@ public class ProgrammaticArguments {
return requestTracker;
}
+ @Nullable
+ public RequestIdGenerator getRequestIdGenerator() {
+ return requestIdGenerator;
+ }
+
@NonNull
public Map<String, String> getLocalDatacenters() {
return localDatacenters;
@@ -196,6 +205,7 @@ public class ProgrammaticArguments {
private NodeStateListener nodeStateListener;
private SchemaChangeListener schemaChangeListener;
private RequestTracker requestTracker;
+ private RequestIdGenerator requestIdGenerator;
private ImmutableMap.Builder<String, String> localDatacentersBuilder =
ImmutableMap.builder();
private final ImmutableMap.Builder<String, Predicate<Node>>
nodeFiltersBuilder =
ImmutableMap.builder();
@@ -294,6 +304,12 @@ public class ProgrammaticArguments {
return this;
}
+ @NonNull
+ public Builder withRequestIdGenerator(@Nullable RequestIdGenerator
requestIdGenerator) {
+ this.requestIdGenerator = requestIdGenerator;
+ return this;
+ }
+
@NonNull
public Builder withLocalDatacenter(
@NonNull String profileName, @NonNull String localDatacenter) {
@@ -417,6 +433,7 @@ public class ProgrammaticArguments {
nodeStateListener,
schemaChangeListener,
requestTracker,
+ requestIdGenerator,
localDatacentersBuilder.build(),
nodeFiltersBuilder.build(),
nodeDistanceEvaluatorsBuilder.build(),
diff --git
a/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java
b/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java
index cbf896a08..255001190 100644
---
a/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java
+++
b/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java
@@ -35,6 +35,7 @@ import
com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
import com.datastax.oss.driver.api.core.ssl.ProgrammaticSslEngineFactory;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import
com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
@@ -47,6 +48,7 @@ import
com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
+import
com.datastax.oss.driver.internal.core.tracker.W3CContextRequestIdGenerator;
import com.datastax.oss.driver.internal.core.util.concurrent.BlockingOperation;
import
com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -83,6 +85,8 @@ import org.slf4j.LoggerFactory;
@NotThreadSafe
public abstract class SessionBuilder<SelfT extends SessionBuilder, SessionT> {
+ public static final String ASTRA_PAYLOAD_KEY = "traceparent";
+
private static final Logger LOG =
LoggerFactory.getLogger(SessionBuilder.class);
@SuppressWarnings("unchecked")
@@ -318,6 +322,17 @@ public abstract class SessionBuilder<SelfT extends
SessionBuilder, SessionT> {
return self;
}
+ /**
+ * Registers a request ID generator. The driver will use the generated ID in
the logs and
+ * optionally add to the custom payload so that users can correlate logs
about the same request
+ * from the Cassandra side.
+ */
+ @NonNull
+ public SelfT withRequestIdGenerator(@NonNull RequestIdGenerator
requestIdGenerator) {
+
this.programmaticArgumentsBuilder.withRequestIdGenerator(requestIdGenerator);
+ return self;
+ }
+
/**
* Registers an authentication provider to use with the session.
*
@@ -861,6 +876,13 @@ public abstract class SessionBuilder<SelfT extends
SessionBuilder, SessionT> {
List<String> configContactPoints =
defaultConfig.getStringList(DefaultDriverOption.CONTACT_POINTS,
Collections.emptyList());
if (cloudConfigInputStream != null) {
+ // override request id generator, unless user has already set it
+ if (programmaticArguments.getRequestIdGenerator() == null) {
+ programmaticArgumentsBuilder.withRequestIdGenerator(
+ new W3CContextRequestIdGenerator(ASTRA_PAYLOAD_KEY));
+ LOG.debug(
+ "A secure connect bundle is provided, using
W3CContextRequestIdGenerator as request ID generator.");
+ }
if (!programmaticContactPoints.isEmpty() ||
!configContactPoints.isEmpty()) {
LOG.info(
"Both a secure connect bundle and contact points were provided.
These are mutually exclusive. The contact points from the secure bundle will
have priority.");
diff --git
a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java
b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java
new file mode 100644
index 000000000..59ac3fdac
--- /dev/null
+++
b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.driver.api.core.tracker;
+
+import com.datastax.oss.driver.api.core.cql.Statement;
+import com.datastax.oss.driver.api.core.session.Request;
+import
com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Interface responsible for generating request IDs.
+ *
+ * <p>Note that all request IDs have a parent/child relationship. A "parent
ID" can loosely be
+ * thought of as encompassing a sequence of a request + any attendant retries,
speculative
+ * executions etc. It's scope is identical to that of a {@link
+ * com.datastax.oss.driver.internal.core.cql.CqlRequestHandler}. A "request
ID" represents a single
+ * request within this larger scope. Note that a request corresponding to a
request ID may be
+ * retried; in that case the retry count will be appended to the corresponding
identifier in the
+ * logs.
+ */
+public interface RequestIdGenerator {
+
+ String DEFAULT_PAYLOAD_KEY = "request-id";
+
+ /**
+ * Generates a unique identifier for the session request. This will be the
identifier for the
+ * entire `session.execute()` call. This identifier will be added to logs,
and propagated to
+ * request trackers.
+ *
+ * @return a unique identifier for the session request
+ */
+ String getSessionRequestId();
+
+ /**
+ * Generates a unique identifier for the node request. This will be the
identifier for the CQL
+ * request against a particular node. There can be one or more node requests
for a single session
+ * request, due to retries or speculative executions. This identifier will
be added to logs, and
+ * propagated to request trackers.
+ *
+ * @param statement the statement to be executed
+ * @param parentId the session request identifier
+ * @return a unique identifier for the node request
+ */
+ String getNodeRequestId(@NonNull Request statement, @NonNull String
parentId);
+
+ default String getCustomPayloadKey() {
+ return DEFAULT_PAYLOAD_KEY;
+ }
+
+ default Statement<?> getDecoratedStatement(
+ @NonNull Statement<?> statement, @NonNull String requestId) {
+ Map<String, ByteBuffer> customPayload =
+ NullAllowingImmutableMap.<String, ByteBuffer>builder()
+ .putAll(statement.getCustomPayload())
+ .put(getCustomPayloadKey(),
ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)))
+ .build();
+ return statement.setCustomPayload(customPayload);
+ }
+}
diff --git
a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java
b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java
index d29ee48d3..065b41e49 100644
---
a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java
+++
b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java
@@ -47,21 +47,22 @@ public interface RequestTracker extends AutoCloseable {
@NonNull Node node) {}
/**
- * Invoked each time a request succeeds.
+ * Invoked each time a session request succeeds. A session request is a
`session.execute()` call
*
* @param latencyNanos the overall execution time (from the {@link
Session#execute(Request,
* GenericType) session.execute} call until the result is made available
to the client).
* @param executionProfile the execution profile of this request.
* @param node the node that returned the successful response.
- * @param requestLogPrefix the dedicated log prefix for this request
+ * @param sessionRequestLogPrefix the dedicated log prefix for this request
*/
default void onSuccess(
@NonNull Request request,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String requestLogPrefix) {
- // If client doesn't override onSuccess with requestLogPrefix delegate
call to the old method
+ @NonNull String sessionRequestLogPrefix) {
+ // If client doesn't override onSuccess with sessionRequestLogPrefix
delegate call to the old
+ // method
onSuccess(request, latencyNanos, executionProfile, node);
}
@@ -78,13 +79,13 @@ public interface RequestTracker extends AutoCloseable {
@Nullable Node node) {}
/**
- * Invoked each time a request fails.
+ * Invoked each time a session request fails. A session request is a
`session.execute()` call
*
* @param latencyNanos the overall execution time (from the {@link
Session#execute(Request,
* GenericType) session.execute} call until the error is propagated to
the client).
* @param executionProfile the execution profile of this request.
* @param node the node that returned the error response, or {@code null} if
the error occurred
- * @param requestLogPrefix the dedicated log prefix for this request
+ * @param sessionRequestLogPrefix the dedicated log prefix for this request
*/
default void onError(
@NonNull Request request,
@@ -92,8 +93,9 @@ public interface RequestTracker extends AutoCloseable {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@Nullable Node node,
- @NonNull String requestLogPrefix) {
- // If client doesn't override onError with requestLogPrefix delegate call
to the old method
+ @NonNull String sessionRequestLogPrefix) {
+ // If client doesn't override onError with sessionRequestLogPrefix
delegate call to the old
+ // method
onError(request, error, latencyNanos, executionProfile, node);
}
@@ -110,14 +112,15 @@ public interface RequestTracker extends AutoCloseable {
@NonNull Node node) {}
/**
- * Invoked each time a request fails at the node level. Similar to {@link
#onError(Request,
- * Throwable, long, DriverExecutionProfile, Node, String)} but at a per node
level.
+ * Invoked each time a node request fails. A node request is a CQL request
sent to a particular
+ * node. There can be one or more node requests for a single session
request, due to retries or
+ * speculative executions.
*
* @param latencyNanos the overall execution time (from the {@link
Session#execute(Request,
* GenericType) session.execute} call until the error is propagated to
the client).
* @param executionProfile the execution profile of this request.
* @param node the node that returned the error response.
- * @param requestLogPrefix the dedicated log prefix for this request
+ * @param nodeRequestLogPrefix the dedicated log prefix for this request
*/
default void onNodeError(
@NonNull Request request,
@@ -125,8 +128,9 @@ public interface RequestTracker extends AutoCloseable {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String requestLogPrefix) {
- // If client doesn't override onNodeError with requestLogPrefix delegate
call to the old method
+ @NonNull String nodeRequestLogPrefix) {
+ // If client doesn't override onNodeError with nodeRequestLogPrefix
delegate call to the old
+ // method
onNodeError(request, error, latencyNanos, executionProfile, node);
}
@@ -142,22 +146,23 @@ public interface RequestTracker extends AutoCloseable {
@NonNull Node node) {}
/**
- * Invoked each time a request succeeds at the node level. Similar to {@link
#onSuccess(Request,
- * long, DriverExecutionProfile, Node, String)} but at per node level.
+ * Invoked each time a node request succeeds. A node request is a CQL
request sent to a particular
+ * node. There can be one or more node requests for a single session
request, due to retries or
+ * speculative executions.
*
* @param latencyNanos the overall execution time (from the {@link
Session#execute(Request,
* GenericType) session.execute} call until the result is made available
to the client).
* @param executionProfile the execution profile of this request.
* @param node the node that returned the successful response.
- * @param requestLogPrefix the dedicated log prefix for this request
+ * @param nodeRequestLogPrefix the dedicated log prefix for this request
*/
default void onNodeSuccess(
@NonNull Request request,
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String requestLogPrefix) {
- // If client doesn't override onNodeSuccess with requestLogPrefix delegate
call to the old
+ @NonNull String nodeRequestLogPrefix) {
+ // If client doesn't override onNodeSuccess with nodeRequestLogPrefix
delegate call to the old
// method
onNodeSuccess(request, latencyNanos, executionProfile, node);
}
diff --git
a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java
b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java
index 0d7db27df..3074bda23 100644
---
a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java
+++
b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java
@@ -44,6 +44,7 @@ import
com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
import com.datastax.oss.driver.api.core.time.TimestampGenerator;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
@@ -221,6 +222,7 @@ public class DefaultDriverContext implements
InternalDriverContext {
private final LazyReference<NodeStateListener> nodeStateListenerRef;
private final LazyReference<SchemaChangeListener> schemaChangeListenerRef;
private final LazyReference<RequestTracker> requestTrackerRef;
+ private final LazyReference<Optional<RequestIdGenerator>>
requestIdGeneratorRef;
private final LazyReference<Optional<AuthProvider>> authProviderRef;
private final LazyReference<List<LifecycleListener>> lifecycleListenersRef =
new LazyReference<>("lifecycleListeners", this::buildLifecycleListeners,
cycleDetector);
@@ -282,6 +284,11 @@ public class DefaultDriverContext implements
InternalDriverContext {
this.requestTrackerRef =
new LazyReference<>(
"requestTracker", () ->
buildRequestTracker(requestTrackerFromBuilder), cycleDetector);
+ this.requestIdGeneratorRef =
+ new LazyReference<>(
+ "requestIdGenerator",
+ () ->
buildRequestIdGenerator(programmaticArguments.getRequestIdGenerator()),
+ cycleDetector);
this.sslEngineFactoryRef =
new LazyReference<>(
"sslEngineFactory",
@@ -708,6 +715,17 @@ public class DefaultDriverContext implements
InternalDriverContext {
}
}
+ protected Optional<RequestIdGenerator> buildRequestIdGenerator(
+ RequestIdGenerator requestIdGenerator) {
+ return (requestIdGenerator != null)
+ ? Optional.of(requestIdGenerator)
+ : Reflection.buildFromConfig(
+ this,
+ DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS,
+ RequestIdGenerator.class,
+ "com.datastax.oss.driver.internal.core.tracker");
+ }
+
protected Optional<AuthProvider> buildAuthProvider(AuthProvider
authProviderFromBuilder) {
return (authProviderFromBuilder != null)
? Optional.of(authProviderFromBuilder)
@@ -972,6 +990,12 @@ public class DefaultDriverContext implements
InternalDriverContext {
return requestTrackerRef.get();
}
+ @NonNull
+ @Override
+ public Optional<RequestIdGenerator> getRequestIdGenerator() {
+ return requestIdGeneratorRef.get();
+ }
+
@Nullable
@Override
public String getLocalDatacenter(@NonNull String profileName) {
diff --git
a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
index 0808bdce6..6842547b1 100644
---
a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
+++
b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
@@ -44,6 +44,7 @@ import
com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException;
import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
import
com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler;
import
com.datastax.oss.driver.internal.core.adminrequest.UnexpectedResponseException;
@@ -59,6 +60,7 @@ import
com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
import com.datastax.oss.driver.internal.core.tracker.RequestLogger;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
+import com.datastax.oss.driver.shaded.guava.common.base.Joiner;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.ProtocolConstants;
@@ -82,6 +84,7 @@ import java.time.Duration;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -100,7 +103,7 @@ public class CqlRequestHandler implements Throttled {
private static final long NANOTIME_NOT_MEASURED_YET = -1;
private final long startTimeNanos;
- private final String logPrefix;
+ private final String handlerLogPrefix;
private final Statement<?> initialStatement;
private final DefaultSession session;
private final CqlIdentifier keyspace;
@@ -125,6 +128,7 @@ public class CqlRequestHandler implements Throttled {
private final List<NodeResponseCallback> inFlightCallbacks;
private final RequestThrottler throttler;
private final RequestTracker requestTracker;
+ private final Optional<RequestIdGenerator> requestIdGenerator;
private final SessionMetricUpdater sessionMetricUpdater;
private final DriverExecutionProfile executionProfile;
@@ -132,15 +136,25 @@ public class CqlRequestHandler implements Throttled {
// We don't use a map because nodes can appear multiple times.
private volatile List<Map.Entry<Node, Throwable>> errors;
+ private final Joiner logPrefixJoiner = Joiner.on('|');
+ private final String sessionName;
+ private final String sessionRequestId;
+
protected CqlRequestHandler(
Statement<?> statement,
DefaultSession session,
InternalDriverContext context,
- String sessionLogPrefix) {
+ String sessionName) {
this.startTimeNanos = System.nanoTime();
- this.logPrefix = sessionLogPrefix + "|" + this.hashCode();
- LOG.trace("[{}] Creating new handler for request {}", logPrefix,
statement);
+ this.requestIdGenerator = context.getRequestIdGenerator();
+ this.sessionName = sessionName;
+ this.sessionRequestId =
+ this.requestIdGenerator
+ .map(RequestIdGenerator::getSessionRequestId)
+ .orElse(Integer.toString(this.hashCode()));
+ this.handlerLogPrefix = logPrefixJoiner.join(sessionName,
sessionRequestId);
+ LOG.trace("[{}] Creating new handler for request {}", handlerLogPrefix,
statement);
this.initialStatement = statement;
this.session = session;
@@ -155,7 +169,7 @@ public class CqlRequestHandler implements Throttled {
context.getRequestThrottler().signalCancel(this);
}
} catch (Throwable t2) {
- Loggers.warnWithException(LOG, "[{}] Uncaught exception",
logPrefix, t2);
+ Loggers.warnWithException(LOG, "[{}] Uncaught exception",
handlerLogPrefix, t2);
}
return null;
});
@@ -250,9 +264,9 @@ public class CqlRequestHandler implements Throttled {
}
Node node = retriedNode;
DriverChannel channel = null;
- if (node == null || (channel = session.getChannel(node, logPrefix)) ==
null) {
+ if (node == null || (channel = session.getChannel(node, handlerLogPrefix))
== null) {
while (!result.isDone() && (node = queryPlan.poll()) != null) {
- channel = session.getChannel(node, logPrefix);
+ channel = session.getChannel(node, handlerLogPrefix);
if (channel != null) {
break;
} else {
@@ -267,6 +281,16 @@ public class CqlRequestHandler implements Throttled {
setFinalError(statement,
AllNodesFailedException.fromErrors(this.errors), null, -1);
}
} else {
+ Statement finalStatement = statement;
+ String nodeRequestId =
+ this.requestIdGenerator
+ .map((g) -> g.getNodeRequestId(finalStatement, sessionRequestId))
+ .orElse(Integer.toString(this.hashCode()));
+ statement =
+ this.requestIdGenerator
+ .map((g) -> g.getDecoratedStatement(finalStatement,
nodeRequestId))
+ .orElse(finalStatement);
+
NodeResponseCallback nodeResponseCallback =
new NodeResponseCallback(
statement,
@@ -276,7 +300,7 @@ public class CqlRequestHandler implements Throttled {
currentExecutionIndex,
retryCount,
scheduleNextExecution,
- logPrefix);
+ logPrefixJoiner.join(this.sessionName, nodeRequestId,
currentExecutionIndex));
Message message = Conversions.toMessage(statement, executionProfile,
context);
channel
.write(message, statement.isTracing(), statement.getCustomPayload(),
nodeResponseCallback)
@@ -335,9 +359,17 @@ public class CqlRequestHandler implements Throttled {
totalLatencyNanos = completionTimeNanos - startTimeNanos;
long nodeLatencyNanos = completionTimeNanos -
callback.nodeStartTimeNanos;
requestTracker.onNodeSuccess(
- callback.statement, nodeLatencyNanos, executionProfile,
callback.node, logPrefix);
+ callback.statement,
+ nodeLatencyNanos,
+ executionProfile,
+ callback.node,
+ handlerLogPrefix);
requestTracker.onSuccess(
- callback.statement, totalLatencyNanos, executionProfile,
callback.node, logPrefix);
+ callback.statement,
+ totalLatencyNanos,
+ executionProfile,
+ callback.node,
+ handlerLogPrefix);
}
if (sessionMetricUpdater.isEnabled(
DefaultSessionMetric.CQL_REQUESTS, executionProfile.getName())) {
@@ -439,7 +471,8 @@ public class CqlRequestHandler implements Throttled {
cancelScheduledTasks();
if (!(requestTracker instanceof NoopRequestTracker)) {
long latencyNanos = System.nanoTime() - startTimeNanos;
- requestTracker.onError(statement, error, latencyNanos,
executionProfile, node, logPrefix);
+ requestTracker.onError(
+ statement, error, latencyNanos, executionProfile, node,
handlerLogPrefix);
}
if (error instanceof DriverTimeoutException) {
throttler.signalTimeout(this);
@@ -489,7 +522,7 @@ public class CqlRequestHandler implements Throttled {
this.execution = execution;
this.retryCount = retryCount;
this.scheduleNextExecution = scheduleNextExecution;
- this.logPrefix = logPrefix + "|" + execution;
+ this.logPrefix = logPrefix;
}
// this gets invoked once the write completes.
@@ -567,7 +600,7 @@ public class CqlRequestHandler implements Throttled {
if (!result.isDone()) {
LOG.trace(
"[{}] Starting speculative execution {}",
- CqlRequestHandler.this.logPrefix,
+ CqlRequestHandler.this.handlerLogPrefix,
index);
activeExecutionsCount.incrementAndGet();
startedSpeculativeExecutionsCount.incrementAndGet();
diff --git
a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java
b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java
index 9c31b606f..8e1c1fe50 100644
---
a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java
+++
b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java
@@ -246,7 +246,7 @@ public class DefaultLoadBalancingPolicy extends
BasicLoadBalancingPolicy impleme
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
updateResponseTimes(node);
}
@@ -257,7 +257,7 @@ public class DefaultLoadBalancingPolicy extends
BasicLoadBalancingPolicy impleme
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
updateResponseTimes(node);
}
diff --git
a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java
index d4d20f3eb..6fe2ba059 100644
---
a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java
+++
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java
@@ -82,10 +82,12 @@ public class MultiplexingRequestTracker implements
RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String sessionRequestLogPrefix) {
invokeTrackers(
- tracker -> tracker.onSuccess(request, latencyNanos, executionProfile,
node, logPrefix),
- logPrefix,
+ tracker ->
+ tracker.onSuccess(
+ request, latencyNanos, executionProfile, node,
sessionRequestLogPrefix),
+ sessionRequestLogPrefix,
"onSuccess");
}
@@ -96,10 +98,12 @@ public class MultiplexingRequestTracker implements
RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@Nullable Node node,
- @NonNull String logPrefix) {
+ @NonNull String sessionRequestLogPrefix) {
invokeTrackers(
- tracker -> tracker.onError(request, error, latencyNanos,
executionProfile, node, logPrefix),
- logPrefix,
+ tracker ->
+ tracker.onError(
+ request, error, latencyNanos, executionProfile, node,
sessionRequestLogPrefix),
+ sessionRequestLogPrefix,
"onError");
}
@@ -109,10 +113,12 @@ public class MultiplexingRequestTracker implements
RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
invokeTrackers(
- tracker -> tracker.onNodeSuccess(request, latencyNanos,
executionProfile, node, logPrefix),
- logPrefix,
+ tracker ->
+ tracker.onNodeSuccess(
+ request, latencyNanos, executionProfile, node,
nodeRequestLogPrefix),
+ nodeRequestLogPrefix,
"onNodeSuccess");
}
@@ -123,11 +129,12 @@ public class MultiplexingRequestTracker implements
RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
invokeTrackers(
tracker ->
- tracker.onNodeError(request, error, latencyNanos,
executionProfile, node, logPrefix),
- logPrefix,
+ tracker.onNodeError(
+ request, error, latencyNanos, executionProfile, node,
nodeRequestLogPrefix),
+ nodeRequestLogPrefix,
"onNodeError");
}
diff --git
a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java
index 09ac27e5e..3821c6ace 100644
---
a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java
+++
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java
@@ -42,7 +42,7 @@ public class NoopRequestTracker implements RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String requestPrefix) {
+ @NonNull String sessionRequestLogPrefix) {
// nothing to do
}
@@ -53,7 +53,7 @@ public class NoopRequestTracker implements RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
Node node,
- @NonNull String requestPrefix) {
+ @NonNull String sessionRequestLogPrefix) {
// nothing to do
}
@@ -64,7 +64,7 @@ public class NoopRequestTracker implements RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String requestPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
// nothing to do
}
@@ -74,7 +74,7 @@ public class NoopRequestTracker implements RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String requestPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
// nothing to do
}
diff --git
a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java
index 235ef051b..f242ff89c 100644
---
a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java
+++
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java
@@ -86,7 +86,7 @@ public class RequestLogger implements RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String sessionRequestLogPrefix) {
boolean successEnabled =
executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED,
false);
@@ -129,7 +129,7 @@ public class RequestLogger implements RequestTracker {
showValues,
maxValues,
maxValueLength,
- logPrefix);
+ sessionRequestLogPrefix);
}
@Override
@@ -139,7 +139,7 @@ public class RequestLogger implements RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
Node node,
- @NonNull String logPrefix) {
+ @NonNull String sessionRequestLogPrefix) {
if
(!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED,
false)) {
return;
@@ -173,7 +173,7 @@ public class RequestLogger implements RequestTracker {
maxValues,
maxValueLength,
showStackTraces,
- logPrefix);
+ sessionRequestLogPrefix);
}
@Override
@@ -183,7 +183,7 @@ public class RequestLogger implements RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
// Nothing to do
}
@@ -193,7 +193,7 @@ public class RequestLogger implements RequestTracker {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
// Nothing to do
}
diff --git
a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java
new file mode 100644
index 000000000..cc07d6717
--- /dev/null
+++
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.driver.internal.core.tracker;
+
+import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.session.Request;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
+import com.datastax.oss.driver.api.core.uuid.Uuids;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+public class UuidRequestIdGenerator implements RequestIdGenerator {
+ public UuidRequestIdGenerator(DriverContext context) {}
+
+ /** Generates a random v4 UUID. */
+ @Override
+ public String getSessionRequestId() {
+ return Uuids.random().toString();
+ }
+
+ /**
+ * {session-request-id}-{random-uuid} All node requests for a session
request will have the same
+ * session request id
+ */
+ @Override
+ public String getNodeRequestId(@NonNull Request statement, @NonNull String
parentId) {
+ return parentId + "-" + Uuids.random();
+ }
+}
diff --git
a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java
new file mode 100644
index 000000000..fe15b93bc
--- /dev/null
+++
b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.driver.internal.core.tracker;
+
+import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.session.Request;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
+import com.datastax.oss.driver.shaded.guava.common.io.BaseEncoding;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import java.security.SecureRandom;
+import java.util.Random;
+
+public class W3CContextRequestIdGenerator implements RequestIdGenerator {
+
+ private final Random random = new SecureRandom();
+ private final BaseEncoding baseEncoding = BaseEncoding.base16().lowerCase();
+ private final String payloadKey;
+
+ public W3CContextRequestIdGenerator(DriverContext context) {
+ payloadKey = RequestIdGenerator.super.getCustomPayloadKey();
+ }
+
+ public W3CContextRequestIdGenerator(String payloadKey) {
+ this.payloadKey = payloadKey;
+ }
+
+ /** Random 16 bytes, e.g. "4bf92f3577b34da6a3ce929d0e0e4736" */
+ @Override
+ public String getSessionRequestId() {
+ byte[] bytes = new byte[16];
+ random.nextBytes(bytes);
+ return baseEncoding.encode(bytes);
+ }
+
+ /**
+ * Following the format of W3C "traceparent" spec,
+ * https://www.w3.org/TR/trace-context/#traceparent-header-field-values e.g.
+ * "00-4bf92f3577b34da6a3ce929d0e0e4736-a3ce929d0e0e4736-01" All node
requests in the same session
+ * request share the same "trace-id" field value
+ */
+ @Override
+ public String getNodeRequestId(@NonNull Request statement, @NonNull String
parentId) {
+ byte[] bytes = new byte[8];
+ random.nextBytes(bytes);
+ return String.format("00-%s-%s-00", parentId, baseEncoding.encode(bytes));
+ }
+
+ @Override
+ public String getCustomPayloadKey() {
+ return this.payloadKey;
+ }
+}
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index 3c6851a48..741b1d976 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -918,6 +918,13 @@ datastax-java-driver {
}
}
+ advanced.request-id {
+ generator {
+ # The component that generates a unique identifier for each CQL request,
and possibly write the id to the custom payload .
+ // class = W3CContextRequestIdGenerator
+ }
+ }
+
# A session-wide component that controls the rate at which requests are
executed.
#
# Implementations vary, but throttlers generally track a metric that
represents the level of
diff --git
a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java
b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java
index 9d86302aa..6ecd61119 100644
---
a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java
+++
b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java
@@ -61,6 +61,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -168,6 +169,8 @@ public class RequestHandlerTestHarness implements
AutoCloseable {
when(context.getRequestThrottler()).thenReturn(new
PassThroughRequestThrottler(context));
when(context.getRequestTracker()).thenReturn(new
NoopRequestTracker(context));
+
+ when(context.getRequestIdGenerator()).thenReturn(Optional.empty());
}
public DefaultSession getSession() {
diff --git
a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java
b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java
new file mode 100644
index 000000000..fb1883e12
--- /dev/null
+++
b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.driver.internal.core.tracker;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.datastax.oss.driver.api.core.cql.Statement;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
+import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.Strict.class)
+public class RequestIdGeneratorTest {
+ @Mock private InternalDriverContext context;
+ @Mock private Statement<?> statement;
+
+ @Test
+ public void uuid_generator_should_generate() {
+ // given
+ UuidRequestIdGenerator generator = new UuidRequestIdGenerator(context);
+ // when
+ String parentId = generator.getSessionRequestId();
+ String requestId = generator.getNodeRequestId(statement, parentId);
+ // then
+ // e.g. "550e8400-e29b-41d4-a716-446655440000", which is 36 characters long
+ assertThat(parentId.length()).isEqualTo(36);
+ // e.g.
"550e8400-e29b-41d4-a716-446655440000-550e8400-e29b-41d4-a716-446655440000",
which is 73
+ // characters long
+ assertThat(requestId.length()).isEqualTo(73);
+ }
+
+ @Test
+ public void w3c_generator_should_generate() {
+ // given
+ W3CContextRequestIdGenerator generator = new
W3CContextRequestIdGenerator(context);
+ // when
+ String parentId = generator.getSessionRequestId();
+ String requestId = generator.getNodeRequestId(statement, parentId);
+ // then
+ // e.g. "4bf92f3577b34da6a3ce929d0e0e4736", which is 32 characters long
+ assertThat(parentId.length()).isEqualTo(32);
+ // According to W3C "traceparent" spec,
+ // https://www.w3.org/TR/trace-context/#traceparent-header-field-values
+ // e.g. "00-4bf92f3577b34da6a3ce929d0e0e4736-a3ce929d0e0e4736-01", which
55 characters long
+ assertThat(requestId.length()).isEqualTo(55);
+ }
+
+ @Test
+ public void w3c_generator_default_payloadkey() {
+ W3CContextRequestIdGenerator w3cGenerator = new
W3CContextRequestIdGenerator(context);
+ assertThat(w3cGenerator.getCustomPayloadKey())
+ .isEqualTo(RequestIdGenerator.DEFAULT_PAYLOAD_KEY);
+ }
+
+ @Test
+ public void w3c_generator_provided_payloadkey() {
+ String someString = RandomStringUtils.random(12);
+ W3CContextRequestIdGenerator w3cGenerator = new
W3CContextRequestIdGenerator(someString);
+ assertThat(w3cGenerator.getCustomPayloadKey()).isEqualTo(someString);
+ }
+}
diff --git
a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java
b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java
new file mode 100644
index 000000000..2848a8fb6
--- /dev/null
+++
b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.driver.core.tracker;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Statement;
+import com.datastax.oss.driver.api.core.session.Request;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
+import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
+import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
+import com.datastax.oss.driver.categories.ParallelizableTests;
+import
com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+@Category(ParallelizableTests.class)
+public class RequestIdGeneratorIT {
+ private CcmRule ccmRule = CcmRule.getInstance();
+
+ @Rule public TestRule chain = RuleChain.outerRule(ccmRule);
+
+ @Test
+ public void should_write_uuid_to_custom_payload_with_key() {
+ DriverConfigLoader loader =
+ SessionUtils.configLoaderBuilder()
+ .withString(DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS,
"UuidRequestIdGenerator")
+ .build();
+ try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
+ String query = "SELECT * FROM system.local";
+ ResultSet rs = session.execute(query);
+ ByteBuffer id =
rs.getExecutionInfo().getRequest().getCustomPayload().get("request-id");
+ assertThat(id.remaining()).isEqualTo(73);
+ }
+ }
+
+ @Test
+ public void should_write_default_request_id_to_custom_payload_with_key() {
+ DriverConfigLoader loader =
+ SessionUtils.configLoaderBuilder()
+ .withString(
+ DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS,
"W3CContextRequestIdGenerator")
+ .build();
+ try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
+ String query = "SELECT * FROM system.local";
+ ResultSet rs = session.execute(query);
+ ByteBuffer id =
rs.getExecutionInfo().getRequest().getCustomPayload().get("request-id");
+ assertThat(id.remaining()).isEqualTo(55);
+ }
+ }
+
+ @Test
+ public void should_use_customized_request_id_generator() {
+ RequestIdGenerator myRequestIdGenerator =
+ new RequestIdGenerator() {
+ @Override
+ public String getSessionRequestId() {
+ return "123";
+ }
+
+ @Override
+ public String getNodeRequestId(@NonNull Request statement, @NonNull
String parentId) {
+ return "456";
+ }
+
+ @Override
+ public Statement<?> getDecoratedStatement(
+ @NonNull Statement<?> statement, @NonNull String requestId) {
+ Map<String, ByteBuffer> customPayload =
+ NullAllowingImmutableMap.<String, ByteBuffer>builder()
+ .putAll(statement.getCustomPayload())
+ .put("trace_key",
ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)))
+ .build();
+ return statement.setCustomPayload(customPayload);
+ }
+ };
+ try (CqlSession session =
+ (CqlSession)
+ SessionUtils.baseBuilder()
+ .addContactEndPoints(ccmRule.getContactPoints())
+ .withRequestIdGenerator(myRequestIdGenerator)
+ .build()) {
+ String query = "SELECT * FROM system.local";
+ ResultSet rs = session.execute(query);
+ ByteBuffer id =
rs.getExecutionInfo().getRequest().getCustomPayload().get("trace_key");
+
assertThat(id).isEqualTo(ByteBuffer.wrap("456".getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+
+ @Test
+ public void should_not_write_id_to_custom_payload_when_key_is_not_set() {
+ DriverConfigLoader loader = SessionUtils.configLoaderBuilder().build();
+ try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
+ String query = "SELECT * FROM system.local";
+ ResultSet rs = session.execute(query);
+
assertThat(rs.getExecutionInfo().getRequest().getCustomPayload().get("trace_key")).isNull();
+ }
+ }
+}
diff --git
a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java
b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java
index eae983396..8eb2fb80a 100644
---
a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java
+++
b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java
@@ -39,7 +39,7 @@ public class RequestNodeLoggerExample extends RequestLogger {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
if
(!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED))
{
return;
}
@@ -66,7 +66,7 @@ public class RequestNodeLoggerExample extends RequestLogger {
maxValues,
maxValueLength,
showStackTraces,
- logPrefix);
+ nodeRequestLogPrefix);
}
@Override
@@ -75,7 +75,7 @@ public class RequestNodeLoggerExample extends RequestLogger {
long latencyNanos,
@NonNull DriverExecutionProfile executionProfile,
@NonNull Node node,
- @NonNull String logPrefix) {
+ @NonNull String nodeRequestLogPrefix) {
boolean successEnabled =
executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED);
boolean slowEnabled =
@@ -114,6 +114,6 @@ public class RequestNodeLoggerExample extends RequestLogger
{
showValues,
maxValues,
maxValueLength,
- logPrefix);
+ nodeRequestLogPrefix);
}
}
diff --git a/manual/core/request_id/README.md b/manual/core/request_id/README.md
new file mode 100644
index 000000000..a766a4419
--- /dev/null
+++ b/manual/core/request_id/README.md
@@ -0,0 +1,48 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Request Id
+
+### Quick overview
+
+Users can inject an identifier for each individual CQL request, and such ID
can be written in to the [custom
payload](https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v5.spec)
to
+correlate a request across the driver and the Apache Cassandra server.
+
+A request ID generator needs to generate both:
+- Session request ID: an identifier for an entire session.execute() call
+- Node request ID: an identifier for the execution of a CQL statement against
a particular node. There can be one or more node requests for a single session
request, due to retries or speculative executions.
+
+Usage:
+* Inject ID generator: set the desired `RequestIdGenerator` in
`advanced.request-id.generator.class`.
+* Add ID to custom payload: the default behavior of a `RequestIdGenerator` is
to add the request ID into the custom payload with the key `request-id`.
Override `RequestIdGenerator.getDecoratedStatement` to customize the behavior.
+
+### Request Id Generator Configuration
+
+Request ID generator can be declared in the [configuration](../configuration/)
as follows:
+
+```
+datastax-java-driver.advanced.request-id.generator {
+ class = com.example.app.MyGenerator
+}
+```
+
+To register your own request ID generator, specify the name of the class
+that implements `RequestIdGenerator`.
+
+The generated ID will be added to the log message of `CqlRequestHandler`, and
propagated to other classes, e.g. the request trackers.
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]