yihua commented on code in PR #17951: URL: https://github.com/apache/hudi/pull/17951#discussion_r2912893657
########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java: ########## @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.AzureStorageLockConfig; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.hudi.azure.credentials.AzureCredentialFactory; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.BlobUrlParts; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; + +/** + * Azure Storage implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + * <p>Supports the following URI schemes: + * <ul> + * <li>ADLS Gen2: {@code abfs://} and {@code abfss://}</li> + * <li>Azure Blob Storage: {@code wasb://} and {@code wasbs://}</li> + * </ul> + * + * + * <ul> + * <li>Create: conditional write with If-None-Match: *</li> + * <li>Update/Renew/Expire: conditional write with If-Match: <etag></li> + * </ul> + * + * <p>Expected lock URI formats: + * <ul> + * <li>{@code abfs://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code abfss://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code wasb://<container>@<account>.blob.core.windows.net/<path>}</li> + * <li>{@code wasbs://<container>@<account>.blob.core.windows.net/<path>}</li> + * </ul> + * + * <p>Authentication precedence (via {@link Properties}): + * <ol> + * <li>{@link AzureStorageLockConfig#AZURE_CONNECTION_STRING} — connection string (includes shared key)</li> + * <li>{@link AzureStorageLockConfig#AZURE_SAS_TOKEN} — shared access signature</li> + * <li>{@link AzureStorageLockConfig#AZURE_MANAGED_IDENTITY_CLIENT_ID} — user-assigned managed identity + * via {@code ManagedIdentityCredential}</li> + * <li>{@link AzureStorageLockConfig#AZURE_CLIENT_TENANT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_SECRET} — service principal + * via {@code ClientSecretCredential}</li> + * <li>{@code DefaultAzureCredential} — probing chain; see {@link org.apache.hudi.azure.credentials.AzureCredentialFactory}</li> + * </ol> + */ +@ThreadSafe +public class AzureStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + public static final String AZURE_CONNECTION_STRING = AzureStorageLockConfig.AZURE_CONNECTION_STRING.key(); + public static final String AZURE_SAS_TOKEN = AzureStorageLockConfig.AZURE_SAS_TOKEN.key(); + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier; + private final ConcurrentMap<String, BlobServiceClient> secondaryBlobServiceClients; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + private final String lockBlobEndpoint; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public AzureStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), LoggerFactory.getLogger(AzureStorageLockClient.class)); + } + + @VisibleForTesting + AzureStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + this.blobServiceClientSupplier = blobServiceClientSupplier; + this.secondaryBlobServiceClients = new ConcurrentHashMap<>(); + + AzureLocation location = parseAzureLocation(lockFileUri).withProperties(props); + this.lockBlobEndpoint = location.blobEndpoint; + this.blobServiceClient = blobServiceClientSupplier.apply(location); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + // 1. Connection string (includes shared-key auth). + String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); + if (connectionString != null && !connectionString.trim().isEmpty()) { + return builder.connectionString(connectionString).buildClient(); + } + + builder.endpoint(location.blobEndpoint); + + // 2. SAS token. + String sasToken = props == null ? null : props.getProperty(AZURE_SAS_TOKEN); + if (sasToken != null && !sasToken.trim().isEmpty()) { + String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; + return builder.credential(new AzureSasCredential(cleaned)).buildClient(); + } + + // 3. TokenCredential — MI, service principal, or DefaultAzureCredential fallback. + return builder.credential(AzureCredentialFactory.getAzureCredential(props)).buildClient(); + }; + } + + private static void configureAzureClientOptions(BlobServiceClientBuilder builder, Properties props) { + // Set Azure SDK timeouts based on lock validity to avoid long-hanging calls. + TypedProperties typedProps = new TypedProperties(); + if (props != null) { + typedProps.putAll(props); + } + long validityTimeoutSecs; + try { + validityTimeoutSecs = getLongWithAltKeys(typedProps, StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS); + } catch (NumberFormatException e) { + validityTimeoutSecs = StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue(); + } + long azureCallTimeoutSecs = Math.max(1, validityTimeoutSecs / 5); + + // Disable automatic SDK retries; Hudi manages retries at the lock-provider level. + ExponentialBackoffOptions exponentialOptions = new ExponentialBackoffOptions().setMaxRetries(0); + RetryOptions retryOptions = new RetryOptions(exponentialOptions); + + HttpClientOptions clientOptions = new HttpClientOptions() + .setResponseTimeout(Duration.ofSeconds(azureCallTimeoutSecs)) + .setReadTimeout(Duration.ofSeconds(azureCallTimeoutSecs)); + + builder.retryOptions(retryOptions).clientOptions(clientOptions); + } + + @Override + public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile( + StorageLockData newLockData, + Option<StorageLockFile> previousLockFile) { + String expectedEtag = previousLockFile.isPresent() ? previousLockFile.get().getVersionId() : null; + try { + StorageLockFile updated = createOrUpdateLockFileInternal(newLockData, expectedEtag); + return Pair.of(LockUpsertResult.SUCCESS, Option.of(updated)); + } catch (BlobStorageException e) { + return Pair.of(handleUpsertBlobStorageException(e), Option.empty()); + } catch (HttpResponseException e) { + logger.error("OwnerId: {}, Unexpected Azure SDK error while writing lock file: {}", + ownerId, lockFileUri, e); + if (!previousLockFile.isPresent()) { + // For create, fail fast since this indicates a larger issue. + throw e; + } + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } catch (Exception e) { + logger.error("OwnerId: {}, Unexpected error while writing lock file: {}", ownerId, lockFileUri, e); + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } + } Review Comment: If the response headers are missing or don't contain `ETag`, `eTag` will be null and get silently passed into the `StorageLockFile`. A subsequent `tryUpsertLockFile` with that file would then treat it as a create (`If-None-Match: *`) instead of an update (`If-Match: <etag>`), which would fail with a precondition error and prevent the rightful owner from renewing the lock. Could you add a null/empty check here similar to what `createOrUpdateLockFileInternal` does (throwing `HoodieLockException`)? ########## hudi-azure/src/main/java/org/apache/hudi/azure/credentials/AzureCredentialFactory.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.credentials; + +import org.apache.hudi.config.AzureStorageLockConfig; + +import com.azure.core.credential.TokenCredential; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.identity.ManagedIdentityCredentialBuilder; + +import java.util.Properties; + +/** + * Factory for resolving an Azure {@link TokenCredential} from Hudi properties. + * + * <p>Credential precedence: + * <ol> + * <li>User-assigned managed identity ({@link AzureStorageLockConfig#AZURE_MANAGED_IDENTITY_CLIENT_ID}) + * — uses {@code ManagedIdentityCredential}</li> + * <li>Service principal ({@link AzureStorageLockConfig#AZURE_CLIENT_TENANT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_SECRET}) + * — uses {@code ClientSecretCredential}</li> + * <li>{@code DefaultAzureCredential} — (system-assigned MI, + * workload identity, env-var SP, Azure CLI, etc.); suitable for dev and environments + * where auth is controlled externally</li> + * </ol> + * + * <p>Note: connection string and SAS token auth are not {@link TokenCredential}-based and are + * handled directly by the caller before this factory is consulted. + */ +public class AzureCredentialFactory { + + // Shared instance so the token cache and IMDS probe are reused across all clients + // that fall through to the default chain. + private static final TokenCredential DEFAULT_AZURE_CREDENTIAL = + new DefaultAzureCredentialBuilder().build(); + Review Comment: This eagerly builds a `DefaultAzureCredential` at class-load time, which can trigger IMDS endpoint probes and network calls. In non-Azure environments (local dev, on-prem Spark), this could cause unexpected hangs or timeouts when the class is first loaded — even if the user configured a connection string or SAS token. Have you considered lazy initialization (e.g., a holder pattern or `synchronized` lazy init)? ########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/ADLSStorageLockClient.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.StorageBasedLockConfig; + +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobProperties; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Azure Data Lake Storage (ADLS) implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + * <p>Supports the following URI schemes: + * <ul> + * <li>ADLS Gen2: {@code abfs://} and {@code abfss://}</li> + * <li>Azure Blob Storage: {@code wasb://} and {@code wasbs://}</li> + * </ul> + * + * + * <ul> + * <li>Create: conditional write with If-None-Match: *</li> + * <li>Update/Renew/Expire: conditional write with If-Match: <etag></li> + * </ul> + * + * <p>Expected lock URI formats: + * <ul> + * <li>{@code abfs://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code abfss://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code wasb://<container>@<account>.blob.core.windows.net/<path>}</li> + * <li>{@code wasbs://<container>@<account>.blob.core.windows.net/<path>}</li> + * </ul> + * + * <p>Authentication precedence (via {@link Properties}): + * <ul> + * <li>{@code hoodie.write.lock.azure.connection.string}</li> + * <li>{@code hoodie.write.lock.azure.sas.token}</li> + * <li>DefaultAzureCredential</li> + * </ul> + */ +@Slf4j +@ThreadSafe +public class ADLSStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + public static final String AZURE_CONNECTION_STRING = "hoodie.write.lock.azure.connection.string"; + public static final String AZURE_SAS_TOKEN = "hoodie.write.lock.azure.sas.token"; + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public ADLSStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), log); + } + + @VisibleForTesting + ADLSStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + + AzureLocation location = parseAzureLocation(lockFileUri); + this.blobServiceClient = blobServiceClientSupplier.apply(location.withProperties(props)); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); + if (connectionString != null && !connectionString.trim().isEmpty()) { + return builder.connectionString(connectionString).buildClient(); + } + + builder.endpoint(location.blobEndpoint); + String sasToken = props == null ? null : props.getProperty(AZURE_SAS_TOKEN); + if (sasToken != null && !sasToken.trim().isEmpty()) { + String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; + return builder.credential(new AzureSasCredential(cleaned)).buildClient(); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()).buildClient(); + }; + } + + private static void configureAzureClientOptions(BlobServiceClientBuilder builder, Properties props) { + // Set Azure SDK timeouts based on lock validity to avoid long-hanging calls. + long validityTimeoutSecs = getLongProperty(props, StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS); + long azureCallTimeoutSecs = Math.max(1, validityTimeoutSecs / 5); + + // Disable automatic SDK retries; Hudi manages retries at the lock-provider level. + ExponentialBackoffOptions exponentialOptions = new ExponentialBackoffOptions().setMaxRetries(0); + RetryOptions retryOptions = new RetryOptions(exponentialOptions); + + HttpClientOptions clientOptions = new HttpClientOptions() + .setResponseTimeout(Duration.ofSeconds(azureCallTimeoutSecs)) + .setReadTimeout(Duration.ofSeconds(azureCallTimeoutSecs)); + + builder.retryOptions(retryOptions).clientOptions(clientOptions); + } + + private static long getLongProperty(Properties props, org.apache.hudi.common.config.ConfigProperty<Long> prop) { + if (props == null) { + return prop.defaultValue(); + } + Object value = props.get(prop.key()); + if (value == null) { + return prop.defaultValue(); + } + try { + return Long.parseLong(String.valueOf(value)); + } catch (NumberFormatException e) { + return prop.defaultValue(); + } + } + + @Override + public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile( + StorageLockData newLockData, + Option<StorageLockFile> previousLockFile) { + String expectedEtag = previousLockFile.isPresent() ? previousLockFile.get().getVersionId() : null; + try { + StorageLockFile updated = createOrUpdateLockFileInternal(newLockData, expectedEtag); + return Pair.of(LockUpsertResult.SUCCESS, Option.of(updated)); + } catch (BlobStorageException e) { + return Pair.of(handleUpsertBlobStorageException(e), Option.empty()); + } catch (HttpResponseException e) { + logger.error("OwnerId: {}, Unexpected Azure SDK error while writing lock file: {}", + ownerId, lockFileUri, e); + if (!previousLockFile.isPresent()) { + // For create, fail fast since this indicates a larger issue. + throw e; + } + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } catch (Exception e) { + logger.error("OwnerId: {}, Unexpected error while writing lock file: {}", ownerId, lockFileUri, e); + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } + } + + @Override + public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() { + try { + BlobProperties props = lockBlobClient.getProperties(); + String eTag = props.getETag(); + try { + return getLockFileFromBlob(lockBlobClient, eTag); + } catch (BlobStorageException e) { + // Blob can change/disappear after properties call; treat 404 during stream read as UNKNOWN_ERROR. + return Pair.of(handleGetStorageException(e, true), Option.empty()); + } + } catch (BlobStorageException e) { + return Pair.of(handleGetStorageException(e, false), Option.empty()); + } + } + + private LockGetResult handleGetStorageException(BlobStorageException e, boolean ignore404) { Review Comment: The fallback `lockBlobClient.getProperties().getETag()` introduces a TOCTOU race — another process could update the blob between the upload response and this getProperties call, returning a stale or wrong ETag. The newer `AzureStorageLockClient` avoids this by reading from response headers. Since this class is going away, just flagging for awareness. ########## hudi-azure/src/main/java/org/apache/hudi/config/AzureStorageLockConfig.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.LockConfiguration; + +/** + * Hoodie Configs for Azure based storage locks. + */ +@ConfigClassProperty(name = "Azure based Locks Configurations", + groupName = ConfigGroups.Names.WRITE_CLIENT, + subGroupName = ConfigGroups.SubGroupNames.LOCK, + description = "Configs that control Azure Blob/ADLS based locking mechanisms " + + "required for concurrency control between writers to a Hudi table.") +public class AzureStorageLockConfig extends HoodieConfig { + + private static final String AZURE_BASED_LOCK_PROPERTY_PREFIX = LockConfiguration.LOCK_PREFIX + "azure."; + + public static final ConfigProperty<String> AZURE_CONNECTION_STRING = ConfigProperty + .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "connection.string") + .noDefaultValue() + .markAdvanced() Review Comment: Let's add `.sinceVersion("1.2.0")` to all these new configs ########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java: ########## @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.AzureStorageLockConfig; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.hudi.azure.credentials.AzureCredentialFactory; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.BlobUrlParts; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; + +/** + * Azure Storage implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + * <p>Supports the following URI schemes: + * <ul> + * <li>ADLS Gen2: {@code abfs://} and {@code abfss://}</li> + * <li>Azure Blob Storage: {@code wasb://} and {@code wasbs://}</li> + * </ul> + * + * + * <ul> + * <li>Create: conditional write with If-None-Match: *</li> + * <li>Update/Renew/Expire: conditional write with If-Match: <etag></li> + * </ul> + * + * <p>Expected lock URI formats: + * <ul> + * <li>{@code abfs://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code abfss://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code wasb://<container>@<account>.blob.core.windows.net/<path>}</li> + * <li>{@code wasbs://<container>@<account>.blob.core.windows.net/<path>}</li> + * </ul> + * + * <p>Authentication precedence (via {@link Properties}): + * <ol> + * <li>{@link AzureStorageLockConfig#AZURE_CONNECTION_STRING} — connection string (includes shared key)</li> + * <li>{@link AzureStorageLockConfig#AZURE_SAS_TOKEN} — shared access signature</li> + * <li>{@link AzureStorageLockConfig#AZURE_MANAGED_IDENTITY_CLIENT_ID} — user-assigned managed identity + * via {@code ManagedIdentityCredential}</li> + * <li>{@link AzureStorageLockConfig#AZURE_CLIENT_TENANT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_SECRET} — service principal + * via {@code ClientSecretCredential}</li> + * <li>{@code DefaultAzureCredential} — probing chain; see {@link org.apache.hudi.azure.credentials.AzureCredentialFactory}</li> + * </ol> + */ +@ThreadSafe +public class AzureStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + public static final String AZURE_CONNECTION_STRING = AzureStorageLockConfig.AZURE_CONNECTION_STRING.key(); + public static final String AZURE_SAS_TOKEN = AzureStorageLockConfig.AZURE_SAS_TOKEN.key(); + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier; + private final ConcurrentMap<String, BlobServiceClient> secondaryBlobServiceClients; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + private final String lockBlobEndpoint; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public AzureStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), LoggerFactory.getLogger(AzureStorageLockClient.class)); + } + + @VisibleForTesting + AzureStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + this.blobServiceClientSupplier = blobServiceClientSupplier; + this.secondaryBlobServiceClients = new ConcurrentHashMap<>(); + + AzureLocation location = parseAzureLocation(lockFileUri).withProperties(props); + this.lockBlobEndpoint = location.blobEndpoint; + this.blobServiceClient = blobServiceClientSupplier.apply(location); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + // 1. Connection string (includes shared-key auth). + String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); Review Comment: Use `ConfigUtils.getStringWithAltKeys` and no need to create `AZURE_CONNECTION_STRING` ########## pom.xml: ########## @@ -112,7 +114,7 @@ <confluent.version>5.5.0</confluent.version> <glassfish.version>2.17</glassfish.version> <glassfish.el.version>3.0.1-b12</glassfish.el.version> - <parquet.version>1.10.1</parquet.version> + <parquet.version>1.13.1</parquet.version> Review Comment: +1 @chrevanthreddy let's revert this ########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.AzureStorageLockConfig; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.common.config.TypedProperties; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.BlobUrlParts; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; + +/** + * Azure Storage implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + * <p>Supports the following URI schemes: + * <ul> + * <li>ADLS Gen2: {@code abfs://} and {@code abfss://}</li> + * <li>Azure Blob Storage: {@code wasb://} and {@code wasbs://}</li> + * </ul> + * + * + * <ul> + * <li>Create: conditional write with If-None-Match: *</li> + * <li>Update/Renew/Expire: conditional write with If-Match: <etag></li> + * </ul> + * + * <p>Expected lock URI formats: + * <ul> + * <li>{@code abfs://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code abfss://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code wasb://<container>@<account>.blob.core.windows.net/<path>}</li> + * <li>{@code wasbs://<container>@<account>.blob.core.windows.net/<path>}</li> + * </ul> + * + * <p>Authentication precedence (via {@link Properties}): + * <ul> + * <li>{@link AzureStorageLockConfig#AZURE_CONNECTION_STRING}</li> + * <li>{@link AzureStorageLockConfig#AZURE_SAS_TOKEN}</li> + * <li>DefaultAzureCredential</li> + * </ul> + */ +@ThreadSafe +public class AzureStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + public static final String AZURE_CONNECTION_STRING = AzureStorageLockConfig.AZURE_CONNECTION_STRING.key(); + public static final String AZURE_SAS_TOKEN = AzureStorageLockConfig.AZURE_SAS_TOKEN.key(); + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier; + private final ConcurrentMap<String, BlobServiceClient> secondaryBlobServiceClients; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + private final String lockBlobEndpoint; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public AzureStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), LoggerFactory.getLogger(AzureStorageLockClient.class)); + } + + @VisibleForTesting + AzureStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + this.blobServiceClientSupplier = blobServiceClientSupplier; + this.secondaryBlobServiceClients = new ConcurrentHashMap<>(); + + AzureLocation location = parseAzureLocation(lockFileUri).withProperties(props); + this.lockBlobEndpoint = location.blobEndpoint; + this.blobServiceClient = blobServiceClientSupplier.apply(location); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); + if (connectionString != null && !connectionString.trim().isEmpty()) { + return builder.connectionString(connectionString).buildClient(); + } + + builder.endpoint(location.blobEndpoint); + String sasToken = props == null ? null : props.getProperty(AZURE_SAS_TOKEN); + if (sasToken != null && !sasToken.trim().isEmpty()) { + String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; + return builder.credential(new AzureSasCredential(cleaned)).buildClient(); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()).buildClient(); + }; + } + + private static void configureAzureClientOptions(BlobServiceClientBuilder builder, Properties props) { + // Set Azure SDK timeouts based on lock validity to avoid long-hanging calls. + TypedProperties typedProps = new TypedProperties(); + if (props != null) { + typedProps.putAll(props); + } + long validityTimeoutSecs; + try { + validityTimeoutSecs = getLongWithAltKeys(typedProps, StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS); + } catch (NumberFormatException e) { + validityTimeoutSecs = StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue(); + } + long azureCallTimeoutSecs = Math.max(1, validityTimeoutSecs / 5); + + // Disable automatic SDK retries; Hudi manages retries at the lock-provider level. + ExponentialBackoffOptions exponentialOptions = new ExponentialBackoffOptions().setMaxRetries(0); + RetryOptions retryOptions = new RetryOptions(exponentialOptions); + + HttpClientOptions clientOptions = new HttpClientOptions() + .setResponseTimeout(Duration.ofSeconds(azureCallTimeoutSecs)) + .setReadTimeout(Duration.ofSeconds(azureCallTimeoutSecs)); + + builder.retryOptions(retryOptions).clientOptions(clientOptions); + } + + @Override + public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile( + StorageLockData newLockData, + Option<StorageLockFile> previousLockFile) { + String expectedEtag = previousLockFile.isPresent() ? previousLockFile.get().getVersionId() : null; + try { + StorageLockFile updated = createOrUpdateLockFileInternal(newLockData, expectedEtag); + return Pair.of(LockUpsertResult.SUCCESS, Option.of(updated)); + } catch (BlobStorageException e) { + return Pair.of(handleUpsertBlobStorageException(e), Option.empty()); + } catch (HttpResponseException e) { + logger.error("OwnerId: {}, Unexpected Azure SDK error while writing lock file: {}", + ownerId, lockFileUri, e); + if (!previousLockFile.isPresent()) { + // For create, fail fast since this indicates a larger issue. + throw e; + } + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } catch (Exception e) { + logger.error("OwnerId: {}, Unexpected error while writing lock file: {}", ownerId, lockFileUri, e); + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } + } + + @Override + public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() { + try { + Response<BinaryData> response = lockBlobClient.downloadContentWithResponse(null, null, null, Context.NONE); + String eTag = response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null; + StorageLockFile lockFile = StorageLockFile.createFromStream(response.getValue().toStream(), eTag); + return Pair.of(LockGetResult.SUCCESS, Option.of(lockFile)); + } catch (BlobStorageException e) { + return Pair.of(handleGetStorageException(e), Option.empty()); + } + } + + private LockGetResult handleGetStorageException(BlobStorageException e) { + int code = e.getStatusCode(); + if (code == NOT_FOUND_ERROR_CODE || e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { + logger.info("OwnerId: {}, Object not found in the path: {}", ownerId, lockFileUri); + return LockGetResult.NOT_EXISTS; + } else if (code == RATE_LIMIT_ERROR_CODE) { + logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFileUri); + } else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) { + logger.warn("OwnerId: {}, Azure returned internal server error code for lock file: {}", ownerId, lockFileUri, e); + } else { + throw e; + } + return LockGetResult.UNKNOWN_ERROR; + } + + private StorageLockFile createOrUpdateLockFileInternal(StorageLockData lockData, String expectedEtag) { + byte[] bytes = StorageLockFile.toByteArray(lockData); + BlobRequestConditions conditions = new BlobRequestConditions(); + if (expectedEtag == null) { + conditions.setIfNoneMatch("*"); + } else { + conditions.setIfMatch(expectedEtag); + } + + BlobParallelUploadOptions options = new BlobParallelUploadOptions(BinaryData.fromBytes(bytes)) + .setRequestConditions(conditions); + Response<BlockBlobItem> response = lockBlobClient.uploadWithResponse(options, null, Context.NONE); + String newEtag = response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null; + if (newEtag == null && response.getValue() != null) { + newEtag = response.getValue().getETag(); + } + if (newEtag == null || newEtag.isEmpty()) { + throw new HoodieLockException("Missing ETag in Azure upload response for lock file: " + lockFileUri); + } + return new StorageLockFile(lockData, newEtag); + } + + private LockUpsertResult handleUpsertBlobStorageException(BlobStorageException e) { + int code = e.getStatusCode(); + if (code == PRECONDITION_FAILURE_ERROR_CODE || e.getErrorCode() == BlobErrorCode.CONDITION_NOT_MET) { + logger.info("OwnerId: {}, Unable to write new lock file. Another process has modified this lockfile {} already.", + ownerId, lockFileUri); + return LockUpsertResult.ACQUIRED_BY_OTHERS; + } else if (code == CONFLICT_ERROR_CODE) { + logger.info("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFileUri); + } else if (code == RATE_LIMIT_ERROR_CODE) { + logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFileUri); + } else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) { + logger.warn("OwnerId: {}, Azure returned internal server error code for lock file: {}", ownerId, lockFileUri, e); + } else { + logger.warn("OwnerId: {}, Error writing lock file: {}", ownerId, lockFileUri, e); Review Comment: The upper layer will handle the `UNKNOWN_ERROR` including any other transient errors as a whole so I think this is OK. ########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.AzureStorageLockConfig; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.common.config.TypedProperties; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.BlobUrlParts; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; + +/** + * Azure Storage implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + * <p>Supports the following URI schemes: + * <ul> + * <li>ADLS Gen2: {@code abfs://} and {@code abfss://}</li> + * <li>Azure Blob Storage: {@code wasb://} and {@code wasbs://}</li> + * </ul> + * + * + * <ul> + * <li>Create: conditional write with If-None-Match: *</li> + * <li>Update/Renew/Expire: conditional write with If-Match: <etag></li> + * </ul> + * + * <p>Expected lock URI formats: + * <ul> + * <li>{@code abfs://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code abfss://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code wasb://<container>@<account>.blob.core.windows.net/<path>}</li> + * <li>{@code wasbs://<container>@<account>.blob.core.windows.net/<path>}</li> + * </ul> + * + * <p>Authentication precedence (via {@link Properties}): + * <ul> + * <li>{@link AzureStorageLockConfig#AZURE_CONNECTION_STRING}</li> + * <li>{@link AzureStorageLockConfig#AZURE_SAS_TOKEN}</li> + * <li>DefaultAzureCredential</li> + * </ul> + */ +@ThreadSafe +public class AzureStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + public static final String AZURE_CONNECTION_STRING = AzureStorageLockConfig.AZURE_CONNECTION_STRING.key(); + public static final String AZURE_SAS_TOKEN = AzureStorageLockConfig.AZURE_SAS_TOKEN.key(); + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier; + private final ConcurrentMap<String, BlobServiceClient> secondaryBlobServiceClients; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + private final String lockBlobEndpoint; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public AzureStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), LoggerFactory.getLogger(AzureStorageLockClient.class)); + } + + @VisibleForTesting + AzureStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + this.blobServiceClientSupplier = blobServiceClientSupplier; + this.secondaryBlobServiceClients = new ConcurrentHashMap<>(); + + AzureLocation location = parseAzureLocation(lockFileUri).withProperties(props); + this.lockBlobEndpoint = location.blobEndpoint; + this.blobServiceClient = blobServiceClientSupplier.apply(location); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); + if (connectionString != null && !connectionString.trim().isEmpty()) { + return builder.connectionString(connectionString).buildClient(); + } + + builder.endpoint(location.blobEndpoint); + String sasToken = props == null ? null : props.getProperty(AZURE_SAS_TOKEN); + if (sasToken != null && !sasToken.trim().isEmpty()) { + String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; + return builder.credential(new AzureSasCredential(cleaned)).buildClient(); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()).buildClient(); + }; + } + + private static void configureAzureClientOptions(BlobServiceClientBuilder builder, Properties props) { + // Set Azure SDK timeouts based on lock validity to avoid long-hanging calls. + TypedProperties typedProps = new TypedProperties(); + if (props != null) { + typedProps.putAll(props); + } + long validityTimeoutSecs; + try { + validityTimeoutSecs = getLongWithAltKeys(typedProps, StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS); + } catch (NumberFormatException e) { + validityTimeoutSecs = StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue(); + } + long azureCallTimeoutSecs = Math.max(1, validityTimeoutSecs / 5); + + // Disable automatic SDK retries; Hudi manages retries at the lock-provider level. + ExponentialBackoffOptions exponentialOptions = new ExponentialBackoffOptions().setMaxRetries(0); + RetryOptions retryOptions = new RetryOptions(exponentialOptions); + + HttpClientOptions clientOptions = new HttpClientOptions() + .setResponseTimeout(Duration.ofSeconds(azureCallTimeoutSecs)) + .setReadTimeout(Duration.ofSeconds(azureCallTimeoutSecs)); + + builder.retryOptions(retryOptions).clientOptions(clientOptions); + } + + @Override + public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile( + StorageLockData newLockData, + Option<StorageLockFile> previousLockFile) { + String expectedEtag = previousLockFile.isPresent() ? previousLockFile.get().getVersionId() : null; + try { + StorageLockFile updated = createOrUpdateLockFileInternal(newLockData, expectedEtag); + return Pair.of(LockUpsertResult.SUCCESS, Option.of(updated)); + } catch (BlobStorageException e) { + return Pair.of(handleUpsertBlobStorageException(e), Option.empty()); + } catch (HttpResponseException e) { + logger.error("OwnerId: {}, Unexpected Azure SDK error while writing lock file: {}", + ownerId, lockFileUri, e); + if (!previousLockFile.isPresent()) { + // For create, fail fast since this indicates a larger issue. + throw e; + } + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } catch (Exception e) { + logger.error("OwnerId: {}, Unexpected error while writing lock file: {}", ownerId, lockFileUri, e); + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } + } + + @Override + public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() { + try { + Response<BinaryData> response = lockBlobClient.downloadContentWithResponse(null, null, null, Context.NONE); + String eTag = response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null; Review Comment: We should throw an exception if the `ETag` is not returned. ########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java: ########## @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.AzureStorageLockConfig; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.hudi.azure.credentials.AzureCredentialFactory; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.BlobUrlParts; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; + +/** + * Azure Storage implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + * <p>Supports the following URI schemes: + * <ul> + * <li>ADLS Gen2: {@code abfs://} and {@code abfss://}</li> + * <li>Azure Blob Storage: {@code wasb://} and {@code wasbs://}</li> + * </ul> + * + * + * <ul> + * <li>Create: conditional write with If-None-Match: *</li> + * <li>Update/Renew/Expire: conditional write with If-Match: <etag></li> + * </ul> + * + * <p>Expected lock URI formats: + * <ul> + * <li>{@code abfs://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code abfss://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code wasb://<container>@<account>.blob.core.windows.net/<path>}</li> + * <li>{@code wasbs://<container>@<account>.blob.core.windows.net/<path>}</li> + * </ul> + * + * <p>Authentication precedence (via {@link Properties}): + * <ol> + * <li>{@link AzureStorageLockConfig#AZURE_CONNECTION_STRING} — connection string (includes shared key)</li> + * <li>{@link AzureStorageLockConfig#AZURE_SAS_TOKEN} — shared access signature</li> + * <li>{@link AzureStorageLockConfig#AZURE_MANAGED_IDENTITY_CLIENT_ID} — user-assigned managed identity + * via {@code ManagedIdentityCredential}</li> + * <li>{@link AzureStorageLockConfig#AZURE_CLIENT_TENANT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_SECRET} — service principal + * via {@code ClientSecretCredential}</li> + * <li>{@code DefaultAzureCredential} — probing chain; see {@link org.apache.hudi.azure.credentials.AzureCredentialFactory}</li> + * </ol> + */ +@ThreadSafe +public class AzureStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + public static final String AZURE_CONNECTION_STRING = AzureStorageLockConfig.AZURE_CONNECTION_STRING.key(); + public static final String AZURE_SAS_TOKEN = AzureStorageLockConfig.AZURE_SAS_TOKEN.key(); + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier; + private final ConcurrentMap<String, BlobServiceClient> secondaryBlobServiceClients; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + private final String lockBlobEndpoint; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public AzureStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), LoggerFactory.getLogger(AzureStorageLockClient.class)); + } + + @VisibleForTesting + AzureStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + this.blobServiceClientSupplier = blobServiceClientSupplier; + this.secondaryBlobServiceClients = new ConcurrentHashMap<>(); + + AzureLocation location = parseAzureLocation(lockFileUri).withProperties(props); + this.lockBlobEndpoint = location.blobEndpoint; + this.blobServiceClient = blobServiceClientSupplier.apply(location); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + // 1. Connection string (includes shared-key auth). + String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); + if (connectionString != null && !connectionString.trim().isEmpty()) { + return builder.connectionString(connectionString).buildClient(); + } + + builder.endpoint(location.blobEndpoint); + + // 2. SAS token. + String sasToken = props == null ? null : props.getProperty(AZURE_SAS_TOKEN); + if (sasToken != null && !sasToken.trim().isEmpty()) { + String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; + return builder.credential(new AzureSasCredential(cleaned)).buildClient(); + } + + // 3. TokenCredential — MI, service principal, or DefaultAzureCredential fallback. + return builder.credential(AzureCredentialFactory.getAzureCredential(props)).buildClient(); + }; + } + + private static void configureAzureClientOptions(BlobServiceClientBuilder builder, Properties props) { + // Set Azure SDK timeouts based on lock validity to avoid long-hanging calls. + TypedProperties typedProps = new TypedProperties(); + if (props != null) { + typedProps.putAll(props); + } + long validityTimeoutSecs; + try { + validityTimeoutSecs = getLongWithAltKeys(typedProps, StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS); + } catch (NumberFormatException e) { + validityTimeoutSecs = StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue(); + } + long azureCallTimeoutSecs = Math.max(1, validityTimeoutSecs / 5); + + // Disable automatic SDK retries; Hudi manages retries at the lock-provider level. + ExponentialBackoffOptions exponentialOptions = new ExponentialBackoffOptions().setMaxRetries(0); + RetryOptions retryOptions = new RetryOptions(exponentialOptions); + + HttpClientOptions clientOptions = new HttpClientOptions() + .setResponseTimeout(Duration.ofSeconds(azureCallTimeoutSecs)) + .setReadTimeout(Duration.ofSeconds(azureCallTimeoutSecs)); + + builder.retryOptions(retryOptions).clientOptions(clientOptions); + } + + @Override + public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile( + StorageLockData newLockData, + Option<StorageLockFile> previousLockFile) { + String expectedEtag = previousLockFile.isPresent() ? previousLockFile.get().getVersionId() : null; + try { + StorageLockFile updated = createOrUpdateLockFileInternal(newLockData, expectedEtag); + return Pair.of(LockUpsertResult.SUCCESS, Option.of(updated)); + } catch (BlobStorageException e) { + return Pair.of(handleUpsertBlobStorageException(e), Option.empty()); + } catch (HttpResponseException e) { + logger.error("OwnerId: {}, Unexpected Azure SDK error while writing lock file: {}", + ownerId, lockFileUri, e); + if (!previousLockFile.isPresent()) { + // For create, fail fast since this indicates a larger issue. + throw e; + } + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } catch (Exception e) { + logger.error("OwnerId: {}, Unexpected error while writing lock file: {}", ownerId, lockFileUri, e); + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } + } + + @Override + public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() { + try { + Response<BinaryData> response = lockBlobClient.downloadContentWithResponse(null, null, null, Context.NONE); + String eTag = response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null; + StorageLockFile lockFile = StorageLockFile.createFromStream(response.getValue().toStream(), eTag); + return Pair.of(LockGetResult.SUCCESS, Option.of(lockFile)); + } catch (BlobStorageException e) { + return Pair.of(handleGetStorageException(e), Option.empty()); + } + } + + private LockGetResult handleGetStorageException(BlobStorageException e) { + int code = e.getStatusCode(); + if (code == NOT_FOUND_ERROR_CODE || e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { + logger.info("OwnerId: {}, Object not found in the path: {}", ownerId, lockFileUri); + return LockGetResult.NOT_EXISTS; + } else if (code == RATE_LIMIT_ERROR_CODE) { + logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFileUri); + } else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) { + logger.warn("OwnerId: {}, Azure returned internal server error code for lock file: {}", ownerId, lockFileUri, e); + } else { + throw e; + } + return LockGetResult.UNKNOWN_ERROR; + } + + private StorageLockFile createOrUpdateLockFileInternal(StorageLockData lockData, String expectedEtag) { + byte[] bytes = StorageLockFile.toByteArray(lockData); + BlobRequestConditions conditions = new BlobRequestConditions(); + if (expectedEtag == null) { + conditions.setIfNoneMatch("*"); + } else { + conditions.setIfMatch(expectedEtag); + } + + BlobParallelUploadOptions options = new BlobParallelUploadOptions(BinaryData.fromBytes(bytes)) + .setRequestConditions(conditions); + Response<BlockBlobItem> response = lockBlobClient.uploadWithResponse(options, null, Context.NONE); + String newEtag = response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null; + if (newEtag == null && response.getValue() != null) { + newEtag = response.getValue().getETag(); Review Comment: AFAIK, `response.getValue().getETag()` returns the unquoted form (`0x8D..`) whereas `response.getHeaders().getValue("ETag")` returns the quoted form (`"0x8D..."`). This creates inconsistency in the stored eTag and causes issue for conditional writes when passing the eTag to `conditions.setIfMatch`. Could you double check the eTag form of both when running on Azure storage and make a fix accordingly? ########## hudi-azure/src/main/java/org/apache/hudi/azure/credentials/AzureCredentialFactory.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.credentials; + +import org.apache.hudi.config.AzureStorageLockConfig; + +import com.azure.core.credential.TokenCredential; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.identity.ManagedIdentityCredentialBuilder; + +import java.util.Properties; + +/** + * Factory for resolving an Azure {@link TokenCredential} from Hudi properties. + * + * <p>Credential precedence: + * <ol> + * <li>User-assigned managed identity ({@link AzureStorageLockConfig#AZURE_MANAGED_IDENTITY_CLIENT_ID}) + * — uses {@code ManagedIdentityCredential}</li> + * <li>Service principal ({@link AzureStorageLockConfig#AZURE_CLIENT_TENANT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_ID} + + * {@link AzureStorageLockConfig#AZURE_CLIENT_SECRET}) + * — uses {@code ClientSecretCredential}</li> + * <li>{@code DefaultAzureCredential} — (system-assigned MI, + * workload identity, env-var SP, Azure CLI, etc.); suitable for dev and environments + * where auth is controlled externally</li> + * </ol> + * + * <p>Note: connection string and SAS token auth are not {@link TokenCredential}-based and are + * handled directly by the caller before this factory is consulted. + */ +public class AzureCredentialFactory { + + // Shared instance so the token cache and IMDS probe are reused across all clients + // that fall through to the default chain. + private static final TokenCredential DEFAULT_AZURE_CREDENTIAL = + new DefaultAzureCredentialBuilder().build(); + Review Comment: Similar to #17552 to avoid initialization failures causing class loading issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
