yifan-c commented on code in PR #337:
URL: https://github.com/apache/cassandra-sidecar/pull/337#discussion_r3267938070
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java:
##########
@@ -293,12 +335,43 @@ public CreateRestoreJobRequestPayload build()
|| !consistencyLevel.isLocalDcOnly
|| (localDc != null &&
!localDc.isEmpty()),
"Must specify a non-empty " +
JOB_LOCAL_DATA_CENTER + " for consistency level: " + consistencyLevel);
- return new CreateRestoreJobRequestPayload(this);
+ // Create a defensive copy of importOptions so that future
updateImportOptions calls on this builder
+ // do not mutate already-built objects. SSTableImportOptions
extends HashMap and is mutated in-place
+ // by updateImportOptions; without a copy, all builds from the
same builder share one mutable instance.
+ SSTableImportOptions importOptionsCopy =
SSTableImportOptions.defaults();
+ importOptionsCopy.putAll(importOptions);
+ return new CreateRestoreJobRequestPayload(jobId, jobAgent,
secrets, credentialType, importOptionsCopy,
+ expireAtInMillis,
nameOrNull(consistencyLevel), localDc,
+ localDatacenterOnly);
Review Comment:
Thanks for putting the comment! I can see the motivation of removing the
constructor with builder.
That said, can we restore the removed constructor and consolidate the copy
of import options into the main constructor (annotated with `@JsonCreator`)?
The copy behavior is wanted in that constructor too.
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java:
##########
@@ -98,10 +125,14 @@ public
CreateRestoreJobRequestPayload(@JsonProperty(JOB_ID) UUID jobId,
"Only time based UUIDs allowed for jobId");
Preconditions.checkArgument(expireAtInMillis != 0 && expireAtInMillis
> System.currentTimeMillis(),
"expireAt cannot be absent or a time in
past");
- Objects.requireNonNull(secrets, "secrets cannot be null");
+ Objects.requireNonNull(secrets, "secrets must be provided");
+ CredentialType effectiveType = credentialType == null ?
CredentialType.STATIC : credentialType;
+ validateCredentials(secrets.readCredentials(), "readCredentials",
effectiveType);
+ validateCredentials(secrets.writeCredentials(), "writeCredentials",
effectiveType);
this.jobId = jobId;
this.jobAgent = jobAgent;
this.secrets = secrets;
+ this.credentialType = effectiveType;
this.importOptions = importOptions == null
? SSTableImportOptions.defaults()
: importOptions;
Review Comment:
Probably do the defensive copy here instead.
##########
integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraVectorSchemaRouteIntegrationTest.java:
##########
Review Comment:
Why is the test included in this patch? I do not see a connection.
##########
server/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java:
##########
@@ -122,6 +125,11 @@ private static RestoreJobSecrets
decodeJobSecrets(ByteBuffer secretsBytes)
"secrets");
}
+ private static CredentialType decodeCredentialType(String value)
+ {
+ return CredentialType.valueOf(value);
Review Comment:
Please consider wraps the call to catch `IllegalArgumentException` and
rethrow as `DataObjectMappingException`
##########
docs/src/spark.adoc:
##########
Review Comment:
Nice
##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/data/QualifiedTableName.java:
##########
Review Comment:
Are they (this and some others) from
https://github.com/apache/cassandra-sidecar/pull/340? Please rebase the patch.
##########
docs/src/spark.adoc:
##########
@@ -0,0 +1,178 @@
+////
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+////
+
+= S3 Restore Jobs from Spark
+
+This document describes how Spark jobs create S3 restore jobs through the
sidecar client and
+explains the two supported credential modes.
+
+== Credential Modes
+
+Two modes are supported, selected by `credentialType` on the request payload.
+
+[cols="1,3"]
+|===
+| Mode | How credentials are resolved
+
+| `STATIC` (default)
+| Short-lived STS key fields are passed in the request. The sidecar uses them
directly.
+Rotate them by calling `updateRestoreJob` before they expire.
+
+| `IAM`
+| No key fields are passed. The sidecar resolves credentials from the AWS
default chain:
+instance profile on EC2, task role on ECS, or IRSA on EKS.
+|===
+
+[[static-flow]]
+== Static Credentials Flow
+
+Use static mode when Spark runs outside AWS, or when credentials are issued by
an STS
+vending service that your job controls.
+
+....
+ Spark Job Credential Vending Service Sidecar
+ | | |
+ |--- request STS credentials ------->| |
+ |<-- accessKeyId / secretAccessKey --| |
+ | sessionToken | |
+ | |
+ |--- createRestoreJob(STATIC, keys) --------------------------------->|
+ | stores keys in
DB
+ | uses
StaticCredentialsProvider
+ | downloads
slices from S3
+ |
+ | (credentials near expiry)
+ |--- updateRestoreJob(new keys) -------------------------------------->|
+ | replaces
cached provider
+ |
+....
+
+=== Code Example
+
+[source,java]
+----
+StorageCredentials readCredentials = StorageCredentials.builder()
+ .accessKeyId("ASIA...")
+
.secretAccessKey("wJalr...")
+ .sessionToken("AQoX...")
+ .region("us-east-1")
+ .build();
+
+StorageCredentials writeCredentials = StorageCredentials.builder()
+ .accessKeyId("ASIA...")
+
.secretAccessKey("wJalr...")
+
.sessionToken("AQoX...")
+ .region("us-east-1")
+ .build();
+
+RestoreJobSecrets secrets = new RestoreJobSecrets(readCredentials,
writeCredentials);
+
+long jobDeadline = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(4);
+
+CreateRestoreJobRequestPayload payload =
+ CreateRestoreJobRequestPayload.builder(secrets, jobDeadline)
+ .jobAgent("Spark Bulk Analytics")
+ .credentialType(CredentialType.STATIC)
+ .consistencyLevel(ConsistencyLevel.QUORUM)
+ .build();
+
+CreateRestoreJobResponsePayload response =
+ sidecarClient.createRestoreJob("my_keyspace", "my_table", payload).get();
+
+UUID jobId = response.jobId();
+----
+
+
+[[iam-flow]]
+== IAM Instance Profile Flow
+
+Use IAM mode when the sidecar runs on AWS infrastructure with an IAM role
attached. No key
+material is passed. The sidecar resolves credentials automatically from the
platform.
+
+....
+ Spark Job Sidecar AWS (IMDS / ECS
/ EKS)
+ | | |
+ |--- createRestoreJob(IAM) -->| |
+ | stores region in DB |
+ | (no key fields stored) |
+ | | |
+ | |--- describe instance profile ----->|
+ | |<-- temporary credentials ----------|
+ | | (refreshed automatically) |
+ | | |
+ | |--- downloads slices from S3 ------>|
+ |
+....
+
+
+=== Code Example
+
+Use `RestoreJobSecrets.iamMode(region)` to build the secrets object. Only
`region` is
+required; all key fields must be absent.
+
+[source,java]
+----
+RestoreJobSecrets secrets = RestoreJobSecrets.iamMode("us-east-1");
+
+long jobDeadline = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(4);
+
+CreateRestoreJobRequestPayload payload =
+ CreateRestoreJobRequestPayload.builder(secrets, jobDeadline)
+ .jobAgent("Spark Bulk Analytics")
+ .credentialType(CredentialType.IAM)
+ .consistencyLevel(ConsistencyLevel.QUORUM)
+ .build();
+
+CreateRestoreJobResponsePayload response =
+ sidecarClient.createRestoreJob("my_keyspace", "my_table", payload).get();
+
+UUID jobId = response.jobId();
+----
+
+NOTE: `expireAt` (the `jobDeadline` above) is the restore job deadline — how
long the sidecar
+will attempt the restore before aborting. It is not related to credential
lifetime and must
+always be provided regardless of credential mode.
+
Review Comment:
👍
##########
server/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessorIntTest.java:
##########
@@ -95,6 +96,48 @@ void testCrudOperations()
assertThat(restoreToLocalDcOnlyJob.get().shouldRestoreToLocalDatacenterOnly).isTrue();
}
+ @CassandraIntegrationTest
+ void testIamCredentialTypeRoundTrips()
+ {
Review Comment:
We are deprecating this integration test style, since it is slow to create a
new cluster per test.
That said, nothing to address for the existing test file in this patch. I'll
get a patch to migrate the integration tests in bulk.
##########
docs/src/spark.adoc:
##########
@@ -0,0 +1,178 @@
+////
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+////
+
+= S3 Restore Jobs from Spark
+
+This document describes how Spark jobs create S3 restore jobs through the
sidecar client and
+explains the two supported credential modes.
+
+== Credential Modes
+
+Two modes are supported, selected by `credentialType` on the request payload.
+
+[cols="1,3"]
+|===
+| Mode | How credentials are resolved
+
+| `STATIC` (default)
+| Short-lived STS key fields are passed in the request. The sidecar uses them
directly.
+Rotate them by calling `updateRestoreJob` before they expire.
+
+| `IAM`
+| No key fields are passed. The sidecar resolves credentials from the AWS
default chain:
+instance profile on EC2, task role on ECS, or IRSA on EKS.
+|===
+
+[[static-flow]]
+== Static Credentials Flow
+
+Use static mode when Spark runs outside AWS, or when credentials are issued by
an STS
+vending service that your job controls.
+
+....
+ Spark Job Credential Vending Service Sidecar
+ | | |
+ |--- request STS credentials ------->| |
+ |<-- accessKeyId / secretAccessKey --| |
+ | sessionToken | |
+ | |
+ |--- createRestoreJob(STATIC, keys) --------------------------------->|
+ | stores keys in
DB
+ | uses
StaticCredentialsProvider
+ | downloads
slices from S3
+ |
+ | (credentials near expiry)
+ |--- updateRestoreJob(new keys) -------------------------------------->|
Review Comment:
nit: this line has one extra `-` comparing to line#55
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/CreateRestoreJobRequestPayload.java:
##########
@@ -293,12 +335,43 @@ public CreateRestoreJobRequestPayload build()
|| !consistencyLevel.isLocalDcOnly
|| (localDc != null &&
!localDc.isEmpty()),
"Must specify a non-empty " +
JOB_LOCAL_DATA_CENTER + " for consistency level: " + consistencyLevel);
- return new CreateRestoreJobRequestPayload(this);
+ // Create a defensive copy of importOptions so that future
updateImportOptions calls on this builder
+ // do not mutate already-built objects. SSTableImportOptions
extends HashMap and is mutated in-place
+ // by updateImportOptions; without a copy, all builds from the
same builder share one mutable instance.
+ SSTableImportOptions importOptionsCopy =
SSTableImportOptions.defaults();
+ importOptionsCopy.putAll(importOptions);
+ return new CreateRestoreJobRequestPayload(jobId, jobAgent,
secrets, credentialType, importOptionsCopy,
+ expireAtInMillis,
nameOrNull(consistencyLevel), localDc,
+ localDatacenterOnly);
}
}
private static String nameOrNull(ConsistencyLevel cl)
{
return cl == null ? null : cl.name();
}
+
+ /**
+ * Validates credentials against the declared {@link CredentialType}.
+ * For {@link CredentialType#STATIC}: all three key fields must be present.
+ * For {@link CredentialType#IAM}: all three key fields must be absent
(only region is allowed).
+ */
+ private static void validateCredentials(StorageCredentials credentials,
String fieldName, CredentialType credentialType)
+ {
+ boolean hasAccessKey = credentials.accessKeyId() != null;
+ boolean hasSecretKey = credentials.secretAccessKey() != null;
+ boolean hasSessionToken = credentials.sessionToken() != null;
+ if (credentialType == CredentialType.IAM)
+ {
Review Comment:
👍 on extracting the type check outside of the `checkArgument` for better
code clarity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]