This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a2bfe1a530 [Feature][elasticsearch-connector] Add API key
authentication support (#9610)
a2bfe1a530 is described below
commit a2bfe1a5305c9f1acda284011be5d9fe566dfb64
Author: CosmosNi <[email protected]>
AuthorDate: Wed Jul 30 10:03:42 2025 +0800
[Feature][elasticsearch-connector] Add API key authentication support
(#9610)
---
docs/en/connector-v2/sink/Elasticsearch.md | 88 ++-
docs/en/connector-v2/source/Elasticsearch.md | 88 ++-
.../elasticsearch/client/EsRestClient.java | 130 +---
.../auth/AbstractAuthenticationProvider.java | 121 ++++
.../client/auth/ApiKeyAuthProvider.java | 109 +++
.../client/auth/ApiKeyEncodedAuthProvider.java | 99 +++
.../client/auth/AuthenticationProvider.java | 63 ++
.../client/auth/AuthenticationProviderFactory.java | 86 +++
.../client/auth/BasicAuthProvider.java | 97 +++
.../elasticsearch/config/AuthTypeEnum.java | 55 ++
.../config/ElasticsearchBaseOptions.java | 27 +
.../exception/ElasticsearchConnectorErrorCode.java | 3 +
.../sink/ElasticsearchSinkFactory.java | 8 +
.../source/ElasticsearchSourceFactory.java | 8 +
.../elasticsearch/ElasticsearchAuthIT.java | 787 +++++++++++++++++++++
.../connector/elasticsearch/ElasticsearchIT.java | 22 +-
.../elasticsearch/ElasticsearchSchemaChangeIT.java | 25 +-
17 files changed, 1673 insertions(+), 143 deletions(-)
diff --git a/docs/en/connector-v2/sink/Elasticsearch.md
b/docs/en/connector-v2/sink/Elasticsearch.md
index 11825413d4..f466cddeff 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -30,8 +30,13 @@ Engine Supported
| index_type | string | no | |
| primary_keys | list | no | |
| key_delimiter | string | no | `_` |
+| auth_type | string | no | basic |
| username | string | no | |
| password | string | no | |
+| auth.api_key_id | string | no | - |
+| auth.api_key | string | no | - |
+| auth.api_key_encoded | string | no | - |
+
| max_retry_count | int | no | 3 |
| max_batch_size | int | no | 10 |
| tls_verify_certificate | boolean | no | true |
@@ -64,13 +69,88 @@ Primary key fields used to generate the document `_id`,
this is cdc required opt
Delimiter for composite keys ("_" by default), e.g., "$" would result in
document `_id` "KEY1$KEY2$KEY3".
-### username [string]
+## Authentication
+
+The Elasticsearch connector supports multiple authentication methods to
connect to secured Elasticsearch clusters. You can choose the appropriate
authentication method based on your Elasticsearch security configuration.
+
+### auth_type [enum]
+
+Specifies the authentication method to use. Supported values:
+- `basic` (default): HTTP Basic Authentication using username and password
+- `api_key`: Elasticsearch API Key authentication using separate ID and key
+- `api_key_encoded`: Elasticsearch API Key authentication using encoded key
+
+If not specified, defaults to `basic` for backward compatibility.
+
+### Basic Authentication
+
+Basic authentication uses HTTP Basic Authentication with username and password
credentials.
+
+#### username [string]
+
+Username for basic authentication (x-pack username).
+
+#### password [string]
+
+Password for basic authentication (x-pack password).
+
+**Example:**
+```hocon
+sink {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ auth_type = "basic"
+ username = "elastic"
+ password = "your_password"
+ index = "my_index"
+ }
+}
+```
+
+### API Key Authentication
+
+API Key authentication provides a more secure way to authenticate with
Elasticsearch using API keys.
-x-pack username
+#### auth.api_key_id [string]
+
+The API key ID generated by Elasticsearch.
+
+#### auth.api_key [string]
+
+The API key secret generated by Elasticsearch.
+
+#### auth.api_key_encoded [string]
+
+Base64 encoded API key in the format `base64(id:api_key)`. This is an
alternative to specifying `auth.api_key_id` and `auth.api_key` separately.
+
+**Note:** You can use either `auth.api_key_id` + `auth.api_key` OR
`auth.api_key_encoded`, but not both.
+
+**Example with separate ID and key:**
+```hocon
+sink {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ auth_type = "api_key"
+ auth.api_key_id = "your_api_key_id"
+ auth.api_key = "your_api_key_secret"
+ index = "my_index"
+ }
+}
+```
+
+**Example with encoded key:**
+```hocon
+sink {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ auth_type = "api_key_encoded"
+ auth.api_key_encoded =
"eW91cl9hcGlfa2V5X2lkOnlvdXJfYXBpX2tleV9zZWNyZXQ="
+ index = "my_index"
+ }
+}
+```
-### password [string]
-x-pack password
### max_retry_count [int]
diff --git a/docs/en/connector-v2/source/Elasticsearch.md
b/docs/en/connector-v2/source/Elasticsearch.md
index dff046767b..69caf3edc6 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -24,8 +24,13 @@ support version >= 2.x and <= 8.x.
| name | type | required | default value
|
|-------------------------|---------|----------|----------------------------------------------------------------|
| hosts | array | yes | -
|
+| auth_type | string | no | basic
|
| username | string | no | -
|
| password | string | no | -
|
+| auth.api_key_id | string | no | -
|
+| auth.api_key | string | no | -
|
+| auth.api_key_encoded | string | no | -
|
+
| index | string | no | If the index list does not
exist, the index must be configured |
| index_list | array | no | used to define a multiple
table task |
| source | array | no | -
|
@@ -52,13 +57,88 @@ support version >= 2.x and <= 8.x.
Elasticsearch cluster http address, the format is `host:port`, allowing
multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.
-### username [string]
+## Authentication
+
+The Elasticsearch connector supports multiple authentication methods to
connect to secured Elasticsearch clusters. You can choose the appropriate
authentication method based on your Elasticsearch security configuration.
+
+### auth_type [enum]
+
+Specifies the authentication method to use. Supported values:
+- `basic` (default): HTTP Basic Authentication using username and password
+- `api_key`: Elasticsearch API Key authentication using separate ID and key
+- `api_key_encoded`: Elasticsearch API Key authentication using encoded key
+
+If not specified, defaults to `basic` for backward compatibility.
+
+### Basic Authentication
+
+Basic authentication uses HTTP Basic Authentication with username and password
credentials.
+
+#### username [string]
+
+Username for basic authentication (x-pack username).
+
+#### password [string]
+
+Password for basic authentication (x-pack password).
+
+**Example:**
+```hocon
+source {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ auth_type = "basic"
+ username = "elastic"
+ password = "your_password"
+ index = "my_index"
+ }
+}
+```
+
+### API Key Authentication
+
+API Key authentication provides a more secure way to authenticate with
Elasticsearch using API keys.
-x-pack username.
+#### auth.api_key_id [string]
+
+The API key ID generated by Elasticsearch.
+
+#### auth.api_key [string]
+
+The API key secret generated by Elasticsearch.
+
+#### auth.api_key_encoded [string]
+
+Base64 encoded API key in the format `base64(id:api_key)`. This is an
alternative to specifying `auth.api_key_id` and `auth.api_key` separately.
+
+**Note:** You can use either `auth.api_key_id` + `auth.api_key` OR
`auth.api_key_encoded`, but not both.
+
+**Example with separate ID and key:**
+```hocon
+source {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ auth_type = "api_key"
+ auth.api_key_id = "your_api_key_id"
+ auth.api_key = "your_api_key_secret"
+ index = "my_index"
+ }
+}
+```
+
+**Example with encoded key:**
+```hocon
+source {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ auth_type = "api_key_encoded"
+ auth.api_key_encoded =
"eW91cl9hcGlfa2V5X2lkOnlvdXJfYXBpX2tleV9zZWNyZXQ="
+ index = "my_index"
+ }
+}
+```
-### password [string]
-x-pack password.
### index [string]
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 69c34faef6..cccf36e3dc 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -26,6 +26,8 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JsonUtils;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProvider;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProviderFactory;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
@@ -34,19 +36,11 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.PointI
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.SSLUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.TrustAllStrategy;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.Asserts;
import org.apache.http.util.EntityUtils;
@@ -57,8 +51,6 @@ import org.elasticsearch.client.RestClientBuilder;
import lombok.extern.slf4j.Slf4j;
-import javax.net.ssl.SSLContext;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@@ -98,117 +90,33 @@ public class EsRestClient implements Closeable {
public static EsRestClient createInstance(ReadonlyConfig config) {
List<String> hosts = config.get(ElasticsearchBaseOptions.HOSTS);
- Optional<String> username =
config.getOptional(ElasticsearchBaseOptions.USERNAME);
- Optional<String> password =
config.getOptional(ElasticsearchBaseOptions.PASSWORD);
- Optional<String> keystorePath = Optional.empty();
- Optional<String> keystorePassword = Optional.empty();
- Optional<String> truststorePath = Optional.empty();
- Optional<String> truststorePassword = Optional.empty();
- boolean tlsVerifyCertificate =
config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE);
- if (tlsVerifyCertificate) {
- keystorePath =
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH);
- keystorePassword =
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD);
- truststorePath =
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH);
- truststorePassword =
-
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD);
- }
- boolean tlsVerifyHostnames =
config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME);
- return createInstance(
- hosts,
- username,
- password,
- tlsVerifyCertificate,
- tlsVerifyHostnames,
- keystorePath,
- keystorePassword,
- truststorePath,
- truststorePassword);
- }
+ // Create basic RestClient builder
+ RestClientBuilder restClientBuilder = createRestClientBuilder(hosts);
+
+ // Configure authentication and TLS using the new authentication system
+ AuthenticationProvider authProvider =
AuthenticationProviderFactory.createProvider(config);
+ authProvider.configure(restClientBuilder, config);
- public static EsRestClient createInstance(
- List<String> hosts,
- Optional<String> username,
- Optional<String> password,
- boolean tlsVerifyCertificate,
- boolean tlsVerifyHostnames,
- Optional<String> keystorePath,
- Optional<String> keystorePassword,
- Optional<String> truststorePath,
- Optional<String> truststorePassword) {
- RestClientBuilder restClientBuilder =
- getRestClientBuilder(
- hosts,
- username,
- password,
- tlsVerifyCertificate,
- tlsVerifyHostnames,
- keystorePath,
- keystorePassword,
- truststorePath,
- truststorePassword);
return new EsRestClient(restClientBuilder.build());
}
- private static RestClientBuilder getRestClientBuilder(
- List<String> hosts,
- Optional<String> username,
- Optional<String> password,
- boolean tlsVerifyCertificate,
- boolean tlsVerifyHostnames,
- Optional<String> keystorePath,
- Optional<String> keystorePassword,
- Optional<String> truststorePath,
- Optional<String> truststorePassword) {
+ /**
+ * Create a basic RestClientBuilder with hosts and request configuration.
Authentication and TLS
+ * configuration will be handled by AuthenticationProvider.
+ */
+ private static RestClientBuilder createRestClientBuilder(List<String>
hosts) {
HttpHost[] httpHosts = new HttpHost[hosts.size()];
for (int i = 0; i < hosts.size(); i++) {
httpHosts[i] = HttpHost.create(hosts.get(i));
}
- RestClientBuilder restClientBuilder =
- RestClient.builder(httpHosts)
- .setRequestConfigCallback(
- requestConfigBuilder ->
- requestConfigBuilder
- .setConnectionRequestTimeout(
-
CONNECTION_REQUEST_TIMEOUT)
-
.setSocketTimeout(SOCKET_TIMEOUT));
-
- restClientBuilder.setHttpClientConfigCallback(
- httpClientBuilder -> {
- if (username.isPresent()) {
- CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new
UsernamePasswordCredentials(username.get(), password.get()));
-
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- }
-
- try {
- if (tlsVerifyCertificate) {
- Optional<SSLContext> sslContext =
- SSLUtils.buildSSLContext(
- keystorePath,
- keystorePassword,
- truststorePath,
- truststorePassword);
-
sslContext.ifPresent(httpClientBuilder::setSSLContext);
- } else {
- SSLContext sslContext =
- SSLContexts.custom()
- .loadTrustMaterial(new
TrustAllStrategy())
- .build();
- httpClientBuilder.setSSLContext(sslContext);
- }
- if (!tlsVerifyHostnames) {
-
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return httpClientBuilder;
- });
- return restClientBuilder;
+ return RestClient.builder(httpHosts)
+ .setRequestConfigCallback(
+ requestConfigBuilder ->
+ requestConfigBuilder
+
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
+ .setSocketTimeout(SOCKET_TIMEOUT));
}
public BulkResponse bulk(String requestBody) {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AbstractAuthenticationProvider.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AbstractAuthenticationProvider.java
new file mode 100644
index 0000000000..5c2b2a924e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AbstractAuthenticationProvider.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.SSLUtils;
+
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.ssl.SSLContexts;
+
+import org.elasticsearch.client.RestClientBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.net.ssl.SSLContext;
+
+import java.util.Optional;
+
+@Slf4j
+public abstract class AbstractAuthenticationProvider implements
AuthenticationProvider {
+
+ @Override
+ public final void configure(RestClientBuilder builder, ReadonlyConfig
config) {
+ builder.setHttpClientConfigCallback(
+ httpClientBuilder -> {
+ // Configure authentication first
+ configureAuthentication(httpClientBuilder, config);
+
+ // Then configure TLS
+ configureTLS(httpClientBuilder, config);
+
+ return httpClientBuilder;
+ });
+ }
+
+ /**
+ * Configure the specific authentication mechanism.
+ *
+ * <p>Subclasses should implement this method to set up their specific
authentication logic on
+ * the HttpAsyncClientBuilder.
+ *
+ * @param httpClientBuilder the HTTP client builder to configure
+ * @param config the readonly configuration containing authentication
parameters
+ */
+ protected abstract void configureAuthentication(
+ HttpAsyncClientBuilder httpClientBuilder, ReadonlyConfig config);
+
+ /**
+ * Configure TLS settings for the HTTP client.
+ *
+ * <p>This method handles SSL/TLS configuration including certificate
verification, hostname
+ * verification, and custom keystores/truststores.
+ *
+ * @param httpClientBuilder the HTTP client builder to configure
+ * @param config the readonly configuration containing TLS parameters
+ */
+ protected void configureTLS(HttpAsyncClientBuilder httpClientBuilder,
ReadonlyConfig config) {
+ boolean tlsVerifyCertificate =
config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE);
+ boolean tlsVerifyHostnames =
config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME);
+
+ try {
+ if (tlsVerifyCertificate) {
+ Optional<String> keystorePath =
+
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH);
+ Optional<String> keystorePassword =
+
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD);
+ Optional<String> truststorePath =
+
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH);
+ Optional<String> truststorePassword =
+
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD);
+
+ Optional<SSLContext> sslContext =
+ SSLUtils.buildSSLContext(
+ keystorePath, keystorePassword,
truststorePath, truststorePassword);
+
+ if (sslContext.isPresent()) {
+ httpClientBuilder.setSSLContext(sslContext.get());
+ log.debug("Custom SSL context configured with
keystore/truststore");
+ } else {
+ log.debug("No custom SSL context configured, using
default");
+ }
+ } else {
+ // Trust all certificates (not recommended for production)
+ SSLContext sslContext =
+ SSLContexts.custom().loadTrustMaterial(new
TrustAllStrategy()).build();
+ httpClientBuilder.setSSLContext(sslContext);
+ log.warn("TLS certificate verification disabled - not
recommended for production");
+ }
+
+ if (!tlsVerifyHostnames) {
+
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
+ log.warn("TLS hostname verification disabled - not recommended
for production");
+ }
+
+ log.debug(
+ "TLS configuration completed - certificate verification:
{}, hostname verification: {}",
+ tlsVerifyCertificate,
+ tlsVerifyHostnames);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to configure TLS settings", e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyAuthProvider.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyAuthProvider.java
new file mode 100644
index 0000000000..bbd5def631
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyAuthProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Optional;
+
+@Slf4j
+public class ApiKeyAuthProvider extends AbstractAuthenticationProvider {
+
+ private static final String AUTH_TYPE = "api_key";
+ private static final String API_KEY_HEADER = "Authorization";
+ private static final String API_KEY_PREFIX = "ApiKey ";
+
+ @Override
+ protected void configureAuthentication(
+ HttpAsyncClientBuilder httpClientBuilder, ReadonlyConfig config) {
+ String encodedApiKey = getEncodedApiKey(config);
+
+ if (encodedApiKey != null) {
+ log.debug("Configuring API key authentication");
+
+ // Add API key header to all requests
+ httpClientBuilder.addInterceptorFirst(
+ (org.apache.http.HttpRequestInterceptor)
+ (request, context) -> {
+ request.setHeader(API_KEY_HEADER,
API_KEY_PREFIX + encodedApiKey);
+ });
+
+ log.info("API key authentication configured successfully");
+ } else {
+ log.debug(
+ "No API key credentials provided, skipping API key
authentication configuration");
+ }
+ }
+
+ @Override
+ public String getAuthType() {
+ return AUTH_TYPE;
+ }
+
+ @Override
+ public void validate(ReadonlyConfig config) {
+ Optional<String> apiKeyId =
config.getOptional(ElasticsearchBaseOptions.API_KEY_ID);
+ Optional<String> apiKey =
config.getOptional(ElasticsearchBaseOptions.API_KEY);
+ Optional<String> apiKeyEncoded =
+ config.getOptional(ElasticsearchBaseOptions.API_KEY_ENCODED);
+
+ if (!apiKeyId.isPresent() || !apiKey.isPresent()) {
+ throw new IllegalArgumentException(
+ "API key authentication with auth_type='api_key' requires
both api_key_id and api_key");
+ }
+ validateApiKeyIdAndSecret(apiKeyId.get(), apiKey.get());
+
+ log.debug("API key authentication configuration validated");
+ }
+
+ /**
+ * Get the encoded API key from configuration.
+ *
+ * @param config the configuration
+ * @return the Base64 encoded API key, or null if not configured
+ */
+ private String getEncodedApiKey(ReadonlyConfig config) {
+ Optional<String> apiKeyId =
config.getOptional(ElasticsearchBaseOptions.API_KEY_ID);
+ Optional<String> apiKey =
config.getOptional(ElasticsearchBaseOptions.API_KEY);
+
+ if (apiKeyId.isPresent() && apiKey.isPresent()) {
+ String credentials = apiKeyId.get() + ":" + apiKey.get();
+ return
Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8));
+ }
+
+ return null;
+ }
+
+ /** Validate API key ID and secret. */
+ private void validateApiKeyIdAndSecret(String apiKeyId, String apiKey) {
+ if (apiKeyId == null || apiKeyId.trim().isEmpty()) {
+ throw new IllegalArgumentException("API key ID cannot be null or
empty");
+ }
+
+ if (apiKey == null || apiKey.trim().isEmpty()) {
+ throw new IllegalArgumentException("API key cannot be null or
empty");
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyEncodedAuthProvider.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyEncodedAuthProvider.java
new file mode 100644
index 0000000000..200bc13705
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyEncodedAuthProvider.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Optional;
+
+@Slf4j
+public class ApiKeyEncodedAuthProvider extends AbstractAuthenticationProvider {
+
+ private static final String AUTH_TYPE = "api_key_encoded";
+ private static final String API_KEY_HEADER = "Authorization";
+ private static final String API_KEY_PREFIX = "ApiKey ";
+
+ @Override
+ protected void configureAuthentication(
+ HttpAsyncClientBuilder httpClientBuilder, ReadonlyConfig config) {
+ Optional<String> apiKeyEncoded =
+ config.getOptional(ElasticsearchBaseOptions.API_KEY_ENCODED);
+
+ if (apiKeyEncoded.isPresent()) {
+ log.debug("Configuring encoded API key authentication");
+
+ // Add API key header to all requests
+ httpClientBuilder.addInterceptorFirst(
+ (org.apache.http.HttpRequestInterceptor)
+ (request, context) -> {
+ request.setHeader(
+ API_KEY_HEADER, API_KEY_PREFIX +
apiKeyEncoded.get());
+ });
+
+ log.info("Encoded API key authentication configured successfully");
+ } else {
+ log.debug(
+ "No encoded API key provided, skipping encoded API key
authentication configuration");
+ }
+ }
+
+ @Override
+ public String getAuthType() {
+ return AUTH_TYPE;
+ }
+
+ @Override
+ public void validate(ReadonlyConfig config) {
+ Optional<String> apiKeyEncoded =
+ config.getOptional(ElasticsearchBaseOptions.API_KEY_ENCODED);
+ if (!apiKeyEncoded.isPresent()) {
+ throw new IllegalArgumentException(
+ "API key authentication with auth_type='api_key_encoded'
requires api_key_encoded");
+ }
+ validateEncodedApiKey(apiKeyEncoded.get());
+
+ log.debug("Encoded API key authentication configuration validated");
+ }
+
+ /** Validate encoded API key. */
+ private void validateEncodedApiKey(String apiKeyEncoded) {
+ if (apiKeyEncoded == null || apiKeyEncoded.trim().isEmpty()) {
+ throw new IllegalArgumentException("Encoded API key cannot be null
or empty");
+ }
+
+ try {
+ byte[] decoded = Base64.getDecoder().decode(apiKeyEncoded);
+ String decodedStr = new String(decoded, StandardCharsets.UTF_8);
+
+ if (!decodedStr.contains(":")) {
+ throw new IllegalArgumentException(
+ "Encoded API key must be Base64 encoded 'id:key'
format");
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Invalid encoded API key format: " + e.getMessage(), e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProvider.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProvider.java
new file mode 100644
index 0000000000..968cd1f5a4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.elasticsearch.client.RestClientBuilder;
+
+public interface AuthenticationProvider {
+
+ /**
+ * Configure the Elasticsearch RestClient with authentication and TLS
settings.
+ *
+ * <p>This method is called during client initialization to set up the
appropriate
+ * authentication mechanism and TLS configuration on the
RestClientBuilder. The implementation
+ * should handle both authentication and TLS configuration to ensure they
work together
+ * properly.
+ *
+ * @param builder the RestClientBuilder to configure
+ * @param config the readonly configuration containing authentication and
TLS parameters
+ * @throws IllegalArgumentException if the configuration is invalid
+ * @throws RuntimeException if authentication or TLS setup fails
+ */
+ void configure(RestClientBuilder builder, ReadonlyConfig config);
+
+ /**
+ * Get the authentication type identifier.
+ *
+ * <p>This identifier is used to match the authentication provider with
the configured auth_type
+ * parameter. It should be a unique, lowercase string that clearly
identifies the authentication
+ * mechanism.
+ *
+ * @return the authentication type identifier (e.g., "basic", "api_key",
"oauth2")
+ */
+ String getAuthType();
+
+ /**
+ * Validate the authentication configuration.
+ *
+ * <p>This method is called before authentication setup to ensure that all
required
+ * configuration parameters are present and valid. It should throw an
exception if the
+ * configuration is incomplete or invalid.
+ *
+ * @param config the readonly configuration to validate
+ * @throws IllegalArgumentException if required parameters are missing or
invalid
+ */
+ void validate(ReadonlyConfig config);
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProviderFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProviderFactory.java
new file mode 100644
index 0000000000..c7eeaef8a8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProviderFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.AuthTypeEnum;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode.UNSUPPORTED_AUTH_TYPE;
+
+@Slf4j
+public class AuthenticationProviderFactory {
+
+ private static final AuthTypeEnum DEFAULT_AUTH_TYPE = AuthTypeEnum.BASIC;
+
+ private static final Map<AuthTypeEnum, Class<? extends
AuthenticationProvider>>
+ PROVIDER_REGISTRY = new HashMap<>();
+
+ static {
+ // Register built-in authentication providers
+ PROVIDER_REGISTRY.put(AuthTypeEnum.BASIC, BasicAuthProvider.class);
+ PROVIDER_REGISTRY.put(AuthTypeEnum.API_KEY, ApiKeyAuthProvider.class);
+ PROVIDER_REGISTRY.put(AuthTypeEnum.API_KEY_ENCODED,
ApiKeyEncodedAuthProvider.class);
+ }
+
+ /**
+ * Create an authentication provider based on the configuration.
+ *
+ * <p>This method examines the auth_type configuration parameter and
creates the appropriate
+ * authentication provider. If no auth_type is specified, it defaults to
basic authentication
+ * for backward compatibility.
+ *
+ * @param config the readonly configuration containing authentication
settings
+ * @return the appropriate authentication provider
+ * @throws ElasticsearchConnectorException if the auth_type is not
supported
+ */
+ public static AuthenticationProvider createProvider(ReadonlyConfig config)
{
+ AuthTypeEnum authType =
+
config.getOptional(ElasticsearchBaseOptions.AUTH_TYPE).orElse(DEFAULT_AUTH_TYPE);
+
+ log.debug("Creating authentication provider for type: {}", authType);
+
+ Class<? extends AuthenticationProvider> providerClass =
PROVIDER_REGISTRY.get(authType);
+ if (providerClass == null) {
+ throw new ElasticsearchConnectorException(
+ UNSUPPORTED_AUTH_TYPE,
+ String.format(
+ "Unsupported authentication type: %s. Supported
types: %s",
+ authType, PROVIDER_REGISTRY.keySet()));
+ }
+
+ try {
+ AuthenticationProvider provider =
providerClass.getDeclaredConstructor().newInstance();
+ provider.validate(config);
+ log.info("Successfully created authentication provider: {}",
authType);
+ return provider;
+ } catch (Exception e) {
+ throw new ElasticsearchConnectorException(
+ UNSUPPORTED_AUTH_TYPE,
+ String.format(
+ "Failed to create authentication provider for
type: %s", authType),
+ e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/BasicAuthProvider.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/BasicAuthProvider.java
new file mode 100644
index 0000000000..7e4d0ce010
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/BasicAuthProvider.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Optional;
+
+@Slf4j
+public class BasicAuthProvider extends AbstractAuthenticationProvider {
+
+ private static final String AUTH_TYPE = "basic";
+
+ @Override
+ protected void configureAuthentication(
+ HttpAsyncClientBuilder httpClientBuilder, ReadonlyConfig config) {
+ Optional<String> username =
config.getOptional(ElasticsearchBaseOptions.USERNAME);
+ Optional<String> password =
config.getOptional(ElasticsearchBaseOptions.PASSWORD);
+
+ if (username.isPresent() && password.isPresent()) {
+ log.debug("Configuring basic authentication for user: {}",
username.get());
+
+ CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new
UsernamePasswordCredentials(username.get(), password.get()));
+
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+
+ log.info("Basic authentication configured successfully for user:
{}", username.get());
+ } else {
+ log.debug("No username/password provided, skipping basic
authentication configuration");
+ }
+ }
+
+ @Override
+ public String getAuthType() {
+ return AUTH_TYPE;
+ }
+
+ @Override
+ public void validate(ReadonlyConfig config) {
+ Optional<String> username =
config.getOptional(ElasticsearchBaseOptions.USERNAME);
+ Optional<String> password =
config.getOptional(ElasticsearchBaseOptions.PASSWORD);
+
+ // For backward compatibility, we allow basic auth to be optional
+ // If username is provided, password must also be provided
+ if (username.isPresent() && !password.isPresent()) {
+ throw new IllegalArgumentException(
+ "Password is required when username is provided for basic
authentication");
+ }
+
+ if (!username.isPresent() && password.isPresent()) {
+ throw new IllegalArgumentException(
+ "Username is required when password is provided for basic
authentication");
+ }
+
+ if (username.isPresent()) {
+ String usernameValue = username.get();
+ if (usernameValue == null || usernameValue.trim().isEmpty()) {
+ throw new IllegalArgumentException("Username cannot be null or
empty");
+ }
+
+ String passwordValue = password.get();
+ if (passwordValue == null || passwordValue.trim().isEmpty()) {
+ throw new IllegalArgumentException("Password cannot be null or
empty");
+ }
+
+ log.debug("Basic authentication configuration validated for user:
{}", usernameValue);
+ } else {
+ log.debug(
+ "No basic authentication credentials provided -
authentication will be skipped");
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/AuthTypeEnum.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/AuthTypeEnum.java
new file mode 100644
index 0000000000..21b6235e5e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/AuthTypeEnum.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+
+public enum AuthTypeEnum {
+ /** HTTP Basic Authentication using username and password */
+ BASIC("basic"),
+
+ /** Elasticsearch API Key authentication using api_key_id and api_key */
+ API_KEY("api_key"),
+
+ /** Elasticsearch API Key authentication using encoded api_key */
+ API_KEY_ENCODED("api_key_encoded");
+
+ private final String value;
+
+ AuthTypeEnum(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * Get AuthTypeEnum from string value.
+ *
+ * @param value the string value
+ * @return the corresponding AuthTypeEnum
+ * @throws IllegalArgumentException if the value is not supported
+ */
+ public static AuthTypeEnum fromValue(String value) {
+ for (AuthTypeEnum authType : values()) {
+ if (authType.getValue().equals(value)) {
+ return authType;
+ }
+ }
+ throw new IllegalArgumentException("Unsupported auth type: " + value);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
index c5e688443e..1c448bd392 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
@@ -87,4 +87,31 @@ public class ElasticsearchBaseOptions implements
Serializable {
.stringType()
.noDefaultValue()
.withDescription("The key password for the trust store
specified");
+
+ // Authentication configuration options
+ public static final Option<AuthTypeEnum> AUTH_TYPE =
+ Options.key("auth_type")
+ .enumType(AuthTypeEnum.class)
+ .defaultValue(AuthTypeEnum.BASIC)
+ .withDescription(
+ "Authentication type. Supported values: basic,
api_key, api_key_encoded");
+
+ // API Key authentication options
+ public static final Option<String> API_KEY_ID =
+ Options.key("auth.api_key_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Elasticsearch API key ID for
authentication");
+
+ public static final Option<String> API_KEY =
+ Options.key("auth.api_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Elasticsearch API key secret for
authentication");
+
+ public static final Option<String> API_KEY_ENCODED =
+ Options.key("auth.api_key_encoded")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Base64 encoded Elasticsearch API key
(id:key format)");
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index b804434b5a..0129c1d8c6 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -40,6 +40,9 @@ public enum ElasticsearchConnectorErrorCode implements
SeaTunnelErrorCode {
CREATE_PIT_FAILED("ELASTICSEARCH-15", "Create Point-in-Time failed"),
DELETE_PIT_FAILED("ELASTICSEARCH-16", "Delete Point-in-Time failed"),
SEARCH_WITH_PIT_FAILED("ELASTICSEARCH-17", "Search with Point-in-Time
failed"),
+ UNSUPPORTED_AUTH_TYPE("ELASTICSEARCH-18", "Unsupported authentication
type"),
+ AUTH_CONFIG_INVALID("ELASTICSEARCH-19", "Authentication configuration is
invalid"),
+ AUTH_SETUP_FAILED("ELASTICSEARCH-20", "Authentication setup failed"),
;
private final String code;
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index d4b27c7acd..8480ca9fb4 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -26,10 +26,15 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.AuthTypeEnum;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
import com.google.auto.service.AutoService;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY_ENCODED;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.AUTH_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.HOSTS;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD;
@@ -76,6 +81,9 @@ public class ElasticsearchSinkFactory implements
TableSinkFactory {
TLS_TRUST_STORE_PATH,
TLS_TRUST_STORE_PASSWORD,
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
+ .optional(AUTH_TYPE)
+ .conditional(AUTH_TYPE, AuthTypeEnum.API_KEY, API_KEY_ID,
API_KEY)
+ .conditional(AUTH_TYPE, AuthTypeEnum.API_KEY_ENCODED,
API_KEY_ENCODED)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index fb11f72faf..dded093dbe 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -24,11 +24,16 @@ import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.AuthTypeEnum;
import com.google.auto.service.AutoService;
import java.io.Serializable;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY_ENCODED;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.AUTH_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.HOSTS;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.INDEX;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.PASSWORD;
@@ -77,6 +82,9 @@ public class ElasticsearchSourceFactory implements
TableSourceFactory {
TLS_KEY_STORE_PASSWORD,
TLS_TRUST_STORE_PATH,
TLS_TRUST_STORE_PASSWORD)
+ .optional(AUTH_TYPE)
+ .conditional(AUTH_TYPE, AuthTypeEnum.API_KEY, API_KEY_ID,
API_KEY)
+ .conditional(AUTH_TYPE, AuthTypeEnum.API_KEY_ENCODED,
API_KEY_ENCODED)
.build();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchAuthIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchAuthIT.java
new file mode 100644
index 0000000000..4a8bf2c822
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchAuthIT.java
@@ -0,0 +1,787 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.elasticsearch;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProvider;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProviderFactory;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class ElasticsearchAuthIT extends TestSuiteBase implements TestResource
{
+
+ private static final String ELASTICSEARCH_IMAGE = "elasticsearch:8.9.0";
+ private static final long INDEX_REFRESH_DELAY = 2000L;
+
+ // Test data constants
+ private static final String TEST_INDEX = "auth_test_index";
+ private static final String VALID_USERNAME = "elastic";
+ private static final String VALID_PASSWORD = "elasticsearch";
+ private static final String INVALID_USERNAME = "wrong_user";
+ private static final String INVALID_PASSWORD = "wrong_password";
+
+ // API Key test constants - will be set dynamically after container starts
+ private String validApiKeyId;
+ private String validApiKeySecret;
+ private String validEncodedApiKey;
+ private static final String INVALID_API_KEY_ID = "invalid-key-id";
+ private static final String INVALID_API_KEY_SECRET = "invalid-key-secret";
+
+ private ElasticsearchContainer elasticsearchContainer;
+ private EsRestClient esRestClient;
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private CloseableHttpClient httpClient;
+
+ @BeforeEach
+ @Override
+ public void startUp() throws Exception {
+ // Initialize HTTP client with SSL trust all strategy
+ initializeHttpClient();
+
+ // Start Elasticsearch container
+ elasticsearchContainer =
+ new ElasticsearchContainer(
+ DockerImageName.parse(ELASTICSEARCH_IMAGE)
+ .asCompatibleSubstituteFor(
+
"docker.elastic.co/elasticsearch/elasticsearch"))
+ .withNetwork(NETWORK)
+
.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false")
+ .withEnv("xpack.security.authc.api_key.enabled",
"true")
+ .withNetworkAliases("elasticsearch")
+ .withPassword("elasticsearch")
+ .withStartupAttempts(5)
+ .withStartupTimeout(Duration.ofMinutes(5))
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger("elasticsearch:8.9.0")));
+ Startables.deepStart(Stream.of(elasticsearchContainer)).join();
+ log.info("Elasticsearch container started");
+
+ // Wait for Elasticsearch to be ready and create real API keys
+ waitForElasticsearchReady();
+ createRealApiKeys();
+
+ // Initialize ES client for test data setup
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(
+ "hosts",
+ Lists.newArrayList("https://" +
elasticsearchContainer.getHttpHostAddress()));
+ configMap.put("username", "elastic");
+ configMap.put("password", "elasticsearch");
+ configMap.put("tls_verify_certificate", false);
+ configMap.put("tls_verify_hostname", false);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ esRestClient = EsRestClient.createInstance(config);
+ createTestIndex();
+ insertTestData();
+ }
+
+ @AfterEach
+ @Override
+ public void tearDown() throws Exception {
+ if (esRestClient != null) {
+ esRestClient.close();
+ }
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ if (elasticsearchContainer != null) {
+ elasticsearchContainer.stop();
+ }
+ }
+
+ /** Initialize HTTP client with SSL trust all strategy for testing */
+ private void initializeHttpClient()
+ throws NoSuchAlgorithmException, KeyStoreException,
KeyManagementException {
+ httpClient =
+ HttpClients.custom()
+ .setSSLContext(
+ SSLContextBuilder.create()
+
.loadTrustMaterial(TrustAllStrategy.INSTANCE)
+ .build())
+ .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
+ .build();
+ log.info("HTTP client initialized with SSL trust all strategy");
+ }
+
+ /** Wait for Elasticsearch to be ready */
+ private void waitForElasticsearchReady() throws IOException,
InterruptedException {
+ String elasticsearchUrl = "https://" +
elasticsearchContainer.getHttpHostAddress();
+ String healthUrl = elasticsearchUrl + "/_cluster/health";
+
+ log.info("Waiting for Elasticsearch to be ready at: {}", healthUrl);
+
+ for (int i = 0; i < 30; i++) {
+ try {
+ HttpGet request = new HttpGet(healthUrl);
+ String auth =
+ Base64.getEncoder()
+ .encodeToString(
+ (VALID_USERNAME + ":" + VALID_PASSWORD)
+
.getBytes(StandardCharsets.UTF_8));
+ request.setHeader("Authorization", "Basic " + auth);
+
+ HttpResponse response = httpClient.execute(request);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ log.info("Elasticsearch is ready");
+ return;
+ }
+ } catch (Exception e) {
+ log.debug("Elasticsearch not ready yet, attempt {}/30: {}", i
+ 1, e.getMessage());
+ }
+
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ throw new RuntimeException("Elasticsearch failed to become ready
within timeout");
+ }
+
+ /** Create real API keys using Elasticsearch API */
+ private void createRealApiKeys() throws IOException {
+ String elasticsearchUrl = "https://" +
elasticsearchContainer.getHttpHostAddress();
+ String apiKeyUrl = elasticsearchUrl + "/_security/api_key";
+
+ log.info("Creating real API key at: {}", apiKeyUrl);
+
+ String requestBody =
+ "{\n"
+ + " \"name\": \"seatunnel-test-api-key\",\n"
+ + " \"role_descriptors\": {\n"
+ + " \"seatunnel_test_role\": {\n"
+ + " \"cluster\": [\"manage\"],\n"
+ + " \"indices\": [\n"
+ + " {\n"
+ + " \"names\": [\""
+ + TEST_INDEX
+ + "\", \"auth_test_*\", \"test_*\", \"*_target\"],\n"
+ + " \"privileges\": [\"all\"]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " },\n"
+ + " \"metadata\": {\n"
+ + " \"application\": \"seatunnel-test\",\n"
+ + " \"environment\": \"integration-test\"\n"
+ + " }\n"
+ + "}";
+
+ HttpPost request = new HttpPost(apiKeyUrl);
+ String auth =
+ Base64.getEncoder()
+ .encodeToString(
+ (VALID_USERNAME + ":" + VALID_PASSWORD)
+ .getBytes(StandardCharsets.UTF_8));
+ request.setHeader("Authorization", "Basic " + auth);
+ request.setHeader("Content-Type", "application/json");
+ request.setEntity(new StringEntity(requestBody,
StandardCharsets.UTF_8));
+
+ HttpResponse response = httpClient.execute(request);
+ String responseBody = EntityUtils.toString(response.getEntity());
+
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new RuntimeException("Failed to create API key: " +
responseBody);
+ }
+
+ // Parse response to extract API key details
+ try {
+ JsonNode jsonResponse = objectMapper.readTree(responseBody);
+ validApiKeyId = jsonResponse.get("id").asText();
+ validApiKeySecret = jsonResponse.get("api_key").asText();
+ validEncodedApiKey =
+ Base64.getEncoder()
+ .encodeToString(
+ (validApiKeyId + ":" + validApiKeySecret)
+ .getBytes(StandardCharsets.UTF_8));
+
+ log.info(
+ "API Key created successfully - ID: {}, Secret: {},
Encoded: {}",
+ validApiKeyId,
+ validApiKeySecret,
+ validEncodedApiKey);
+
+ // Verify the API key works
+ verifyApiKey();
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse API key response: " +
responseBody, e);
+ }
+ }
+
+ /** Verify that the created API key works */
+ private void verifyApiKey() throws IOException {
+ String elasticsearchUrl = "https://" +
elasticsearchContainer.getHttpHostAddress();
+ String authUrl = elasticsearchUrl + "/_security/_authenticate";
+
+ HttpGet request = new HttpGet(authUrl);
+ request.setHeader("Authorization", "ApiKey " + validEncodedApiKey);
+
+ HttpResponse response = httpClient.execute(request);
+ String responseBody = EntityUtils.toString(response.getEntity());
+
+ if (response.getStatusLine().getStatusCode() == 200) {
+ log.info("API Key verification successful: {}", responseBody);
+ } else {
+ throw new RuntimeException("API Key verification failed: " +
responseBody);
+ }
+ }
+
+ private void createTestIndex() throws Exception {
+ String mapping =
+ "{"
+ + "\"mappings\": {"
+ + "\"properties\": {"
+ + "\"id\": {\"type\": \"integer\"},"
+ + "\"name\": {\"type\": \"text\"},"
+ + "\"value\": {\"type\": \"double\"}"
+ + "}"
+ + "}"
+ + "}";
+
+ log.info("Creating test index: {}", TEST_INDEX);
+
+ try {
+ esRestClient.createIndex(TEST_INDEX, mapping);
+ log.info("Test index '{}' created successfully", TEST_INDEX);
+ } catch (Exception e) {
+ log.error("Failed to create test index: {}", e.getMessage(), e);
+ throw new RuntimeException("Failed to create test index: " +
TEST_INDEX, e);
+ }
+ }
+
+ private void insertTestData() throws Exception {
+ StringBuilder requestBody = new StringBuilder();
+ String indexHeader = "{\"index\":{\"_index\":\"" + TEST_INDEX +
"\"}}\n";
+
+ for (int i = 1; i <= 3; i++) {
+ Map<String, Object> doc = new HashMap<>();
+ doc.put("id", i);
+ doc.put("name", "test_" + i);
+ doc.put("value", i * 10.5);
+
+ requestBody.append(indexHeader);
+ requestBody.append(objectMapper.writeValueAsString(doc));
+ requestBody.append("\n");
+ }
+
+ log.info("Inserting test data into index: {}", TEST_INDEX);
+
+ try {
+ BulkResponse response = esRestClient.bulk(requestBody.toString());
+ if (response.isErrors()) {
+ log.error("Bulk insert had errors: {}",
response.getResponse());
+ throw new RuntimeException("Failed to insert test data: " +
response.getResponse());
+ }
+
+ Thread.sleep(INDEX_REFRESH_DELAY);
+ log.info("Test data inserted successfully - {} documents", 3);
+ } catch (Exception e) {
+ log.error("Failed to insert test data", e);
+ throw new RuntimeException("Failed to insert test data", e);
+ }
+ }
+
+ // Helper methods for creating configurations
+ private Map<String, Object> createBasicAuthConfig(String username, String
password) {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(
+ "hosts",
+ Lists.newArrayList("https://" +
elasticsearchContainer.getHttpHostAddress()));
+ configMap.put("username", username);
+ configMap.put("password", password);
+ configMap.put("tls_verify_certificate", false);
+ configMap.put("tls_verify_hostname", false);
+
+ return configMap;
+ }
+
+ private Map<String, Object> createApiKeyConfig(String keyId, String
keySecret) {
+ Map<String, Object> config = new HashMap<>();
+ config.put(
+ "hosts",
+ Lists.newArrayList("https://" +
elasticsearchContainer.getHttpHostAddress()));
+ config.put("auth_type", "api_key");
+ config.put("auth.api_key_id", keyId);
+ config.put("auth.api_key", keySecret);
+ config.put("tls_verify_certificate", false);
+ config.put("tls_verify_hostname", false);
+ return config;
+ }
+
+ private Map<String, Object> createApiKeyEncodedConfig(String encodedKey) {
+ Map<String, Object> config = new HashMap<>();
+ config.put(
+ "hosts",
+ Lists.newArrayList("https://" +
elasticsearchContainer.getHttpHostAddress()));
+ config.put("auth_type", "api_key_encoded");
+ config.put("auth.api_key_encoded", encodedKey);
+ config.put("tls_verify_certificate", false);
+ config.put("tls_verify_hostname", false);
+ return config;
+ }
+
+ // ==================== Basic Authentication Tests ====================
+
+ /** Test successful basic authentication with valid credentials */
+ @Test
+ public void testBasicAuthenticationSuccess() throws Exception {
+ log.info("=== Testing Basic Authentication Success ===");
+
+ Map<String, Object> config = createBasicAuthConfig(VALID_USERNAME,
VALID_PASSWORD);
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+ // Test provider creation
+ AuthenticationProvider provider =
+ AuthenticationProviderFactory.createProvider(readonlyConfig);
+ Assertions.assertNotNull(provider, "Authentication provider should be
created");
+ Assertions.assertEquals(
+ "basic", provider.getAuthType(), "Provider should be basic
auth type");
+
+ // Test client creation and functionality
+ try (EsRestClient client =
EsRestClient.createInstance(readonlyConfig)) {
+ Assertions.assertNotNull(client, "EsRestClient should be created
successfully");
+
+ // Verify client can perform operations
+ long docCount =
client.getIndexDocsCount(TEST_INDEX).get(0).getDocsCount();
+ Assertions.assertTrue(
+ docCount > 0, "Should be able to query index with valid
credentials");
+
+ log.info("✓ Basic authentication success test passed - {}
documents found", docCount);
+ }
+ }
+
+ /** Test basic authentication failure with invalid credentials */
+ @Test
+ public void testBasicAuthenticationFailure() throws Exception {
+ log.info("=== Testing Basic Authentication Failure ===");
+
+ Map<String, Object> config = createBasicAuthConfig(INVALID_USERNAME,
INVALID_PASSWORD);
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+ // Test provider creation (should succeed)
+ AuthenticationProvider provider =
+ AuthenticationProviderFactory.createProvider(readonlyConfig);
+ Assertions.assertNotNull(
+ provider,
+ "Authentication provider should be created even with invalid
credentials");
+ Assertions.assertEquals(
+ "basic", provider.getAuthType(), "Provider should be basic
auth type");
+
+ // Test client creation (should succeed)
+ try (EsRestClient client =
EsRestClient.createInstance(readonlyConfig)) {
+ Assertions.assertNotNull(client, "EsRestClient should be created");
+
+ // Test operation (should fail with authentication error)
+ Exception exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () -> {
+ client.getIndexDocsCount(TEST_INDEX);
+ },
+ "Should throw exception when using invalid
credentials");
+
+ log.info(
+ "✓ Basic authentication failure test passed - exception:
{}",
+ exception.getMessage());
+ }
+ }
+
+ // ==================== API Key Authentication Tests ====================
+
+ /** Test successful API key authentication with valid key */
+ @Test
+ public void testApiKeyAuthenticationSuccess() throws Exception {
+ log.info("=== Testing API Key Authentication Success ===");
+
+ Map<String, Object> config = createApiKeyConfig(validApiKeyId,
validApiKeySecret);
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+ // Test provider creation
+ AuthenticationProvider provider =
+ AuthenticationProviderFactory.createProvider(readonlyConfig);
+ Assertions.assertNotNull(provider, "Authentication provider should be
created");
+ Assertions.assertEquals(
+ "api_key", provider.getAuthType(), "Provider should be api_key
auth type");
+
+ // Test client creation and functionality
+ try (EsRestClient client =
EsRestClient.createInstance(readonlyConfig)) {
+ Assertions.assertNotNull(client, "EsRestClient should be created
successfully");
+
+ // Verify client can perform operations with real API key
+ long docCount =
client.getIndexDocsCount(TEST_INDEX).get(0).getDocsCount();
+ Assertions.assertTrue(docCount > 0, "Should be able to query index
with valid API key");
+
+ log.info("✓ API key authentication success test passed - {}
documents found", docCount);
+ }
+ }
+
+ /** Test API key authentication failure with invalid key */
+ @Test
+ public void testApiKeyAuthenticationFailure() throws Exception {
+ log.info("=== Testing API Key Authentication Failure ===");
+
+ Map<String, Object> config = createApiKeyConfig(INVALID_API_KEY_ID,
INVALID_API_KEY_SECRET);
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+ // Test provider creation (should succeed)
+ AuthenticationProvider provider =
+ AuthenticationProviderFactory.createProvider(readonlyConfig);
+ Assertions.assertNotNull(provider, "Authentication provider should be
created");
+ Assertions.assertEquals(
+ "api_key", provider.getAuthType(), "Provider should be api_key
auth type");
+
+ // Test client creation (should succeed)
+ try (EsRestClient client =
EsRestClient.createInstance(readonlyConfig)) {
+ Assertions.assertNotNull(client, "EsRestClient should be created");
+
+ // Test operation (should fail with authentication error)
+ Exception exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () -> {
+ client.getIndexDocsCount(TEST_INDEX);
+ },
+ "Should throw exception when using invalid API
key");
+
+ log.info(
+ "✓ API key authentication failure test passed - exception:
{}",
+ exception.getMessage());
+ }
+ }
+
+ /** Test API key authentication with encoded format */
+ @Test
+ public void testApiKeyEncodedAuthentication() throws Exception {
+ log.info("=== Testing API Key Encoded Authentication ===");
+
+ Map<String, Object> config =
createApiKeyEncodedConfig(validEncodedApiKey);
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+ // Test provider creation
+ AuthenticationProvider provider =
+ AuthenticationProviderFactory.createProvider(readonlyConfig);
+ Assertions.assertNotNull(provider, "Authentication provider should be
created");
+ Assertions.assertEquals(
+ "api_key_encoded",
+ provider.getAuthType(),
+ "Provider should be api_key_encoded auth type");
+
+ // Test client creation and functionality
+ try (EsRestClient client =
EsRestClient.createInstance(readonlyConfig)) {
+ Assertions.assertNotNull(client, "EsRestClient should be created
successfully");
+
+ // Verify client can perform operations with encoded API key
+ long docCount =
client.getIndexDocsCount(TEST_INDEX).get(0).getDocsCount();
+ Assertions.assertTrue(
+ docCount > 0, "Should be able to query index with valid
encoded API key");
+
+ log.info("✓ API key encoded authentication test passed - {}
documents found", docCount);
+ }
+ }
+
+ /** E2E test: API Key authentication source and sink */
+ @TestTemplate
+ public void testE2EApiKeyAuthSourceAndSink(TestContainer container) throws
Exception {
+ log.info("=== E2E Test: API Key Authentication Source and Sink ===");
+
+ // Setup test data
+ setupAuthTestData();
+
+ // Create temporary config file with real API key values in resources
directory
+ String configContent = createApiKeyConfigContent();
+ java.io.File resourcesDir = new
java.io.File("src/test/resources/elasticsearch");
+ if (!resourcesDir.exists()) {
+ resourcesDir.mkdirs();
+ }
+
+ java.io.File tempConfigFile =
+ new java.io.File(resourcesDir,
"elasticsearch_auth_apikey_temp.conf");
+ try (java.io.FileWriter writer = new
java.io.FileWriter(tempConfigFile)) {
+ writer.write(configContent);
+ }
+
+ try {
+ // Execute SeaTunnel job with API key auth using relative path
+ Container.ExecResult execResult =
+
container.executeJob("/elasticsearch/elasticsearch_auth_apikey_temp.conf");
+ Assertions.assertEquals(
+ 0, execResult.getExitCode(), "Job should complete
successfully");
+
+ // Wait for index refresh
+ Thread.sleep(2000);
+
+ // Verify results
+ long targetCount =
+
esRestClient.getIndexDocsCount("auth_test_apikey_target").get(0).getDocsCount();
+ log.info("✓ API Key auth E2E test completed - {} documents
processed", targetCount);
+ Assertions.assertTrue(
+ targetCount > 0, "Should have processed documents with API
key auth");
+
+ } finally {
+ // Clean up temporary file
+ if (tempConfigFile.exists()) {
+ tempConfigFile.delete();
+ }
+ }
+ }
+
+ /** E2E test: API Key Encoded authentication source and sink */
+ @TestTemplate
+ public void testE2EApiKeyEncodedAuthSourceAndSink(TestContainer container)
throws Exception {
+ log.info("=== E2E Test: API Key Encoded Authentication Source and Sink
===");
+
+ // Setup test data
+ setupAuthTestData();
+
+ // Create temporary config file with real encoded API key values
+ String configContent = createApiKeyEncodedConfigContent();
+ java.io.File resourcesDir = new
java.io.File("src/test/resources/elasticsearch");
+ if (!resourcesDir.exists()) {
+ resourcesDir.mkdirs();
+ }
+
+ java.io.File tempConfigFile =
+ new java.io.File(resourcesDir,
"elasticsearch_auth_apikey_encoded_temp.conf");
+ try (java.io.FileWriter writer = new
java.io.FileWriter(tempConfigFile)) {
+ writer.write(configContent);
+ }
+
+ try {
+ // Execute SeaTunnel job with encoded API key auth
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/elasticsearch/elasticsearch_auth_apikey_encoded_temp.conf");
+ Assertions.assertEquals(
+ 0, execResult.getExitCode(), "Job should complete
successfully");
+
+ // Wait for index refresh
+ Thread.sleep(2000);
+
+ // Verify results
+ long targetCount =
+ esRestClient
+
.getIndexDocsCount("auth_test_apikey_encoded_target")
+ .get(0)
+ .getDocsCount();
+ log.info(
+ "✓ API Key Encoded auth E2E test completed - {} documents
processed",
+ targetCount);
+ Assertions.assertTrue(
+ targetCount > 0, "Should have processed documents with
encoded API key auth");
+
+ } finally {
+ // Clean up temporary file
+ if (tempConfigFile.exists()) {
+ tempConfigFile.delete();
+ }
+ }
+ }
+
+ /** Create API Key configuration content with real values */
+ private String createApiKeyConfigContent() {
+ return String.format(
+ "env {\n"
+ + " parallelism = 1\n"
+ + " job.mode = \"BATCH\"\n"
+ + "}\n"
+ + "\n"
+ + "source {\n"
+ + " Elasticsearch {\n"
+ + " hosts = [\"https://elasticsearch:9200\"]\n"
+ + " auth_type = \"api_key\"\n"
+ + " auth.api_key_id = \"%s\"\n"
+ + " auth.api_key = \"%s\"\n"
+ + " tls_verify_certificate = false\n"
+ + " tls_verify_hostname = false\n"
+ + "\n"
+ + " index = \"auth_test_index\"\n"
+ + " query = {\"match_all\": {}}\n"
+ + " schema = {\n"
+ + " fields {\n"
+ + " id = int\n"
+ + " name = string\n"
+ + " category = string\n"
+ + " price = double\n"
+ + " timestamp = timestamp\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + "}\n"
+ + "\n"
+ + "sink {\n"
+ + " Elasticsearch {\n"
+ + " hosts = [\"https://elasticsearch:9200\"]\n"
+ + " auth_type = \"api_key\"\n"
+ + " auth.api_key_id = \"%s\"\n"
+ + " auth.api_key = \"%s\"\n"
+ + " tls_verify_certificate = false\n"
+ + " tls_verify_hostname = false\n"
+ + "\n"
+ + " index = \"auth_test_apikey_target\"\n"
+ + " schema_save_mode =
\"CREATE_SCHEMA_WHEN_NOT_EXIST\"\n"
+ + " data_save_mode = \"APPEND_DATA\"\n"
+ + " }\n"
+ + "}\n",
+ validApiKeyId, validApiKeySecret, validApiKeyId,
validApiKeySecret);
+ }
+
+ /** Create API Key Encoded configuration content with real values */
+ private String createApiKeyEncodedConfigContent() {
+ return String.format(
+ "env {\n"
+ + " parallelism = 1\n"
+ + " job.mode = \"BATCH\"\n"
+ + "}\n"
+ + "\n"
+ + "source {\n"
+ + " Elasticsearch {\n"
+ + " hosts = [\"https://elasticsearch:9200\"]\n"
+ + " auth_type = \"api_key_encoded\"\n"
+ + " auth.api_key_encoded = \"%s\"\n"
+ + " tls_verify_certificate = false\n"
+ + " tls_verify_hostname = false\n"
+ + "\n"
+ + " index = \"auth_test_index\"\n"
+ + " query = {\"match_all\": {}}\n"
+ + " schema = {\n"
+ + " fields {\n"
+ + " id = int\n"
+ + " name = string\n"
+ + " category = string\n"
+ + " price = double\n"
+ + " timestamp = timestamp\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + "}\n"
+ + "\n"
+ + "sink {\n"
+ + " Elasticsearch {\n"
+ + " hosts = [\"https://elasticsearch:9200\"]\n"
+ + " auth_type = \"api_key_encoded\"\n"
+ + " auth.api_key_encoded = \"%s\"\n"
+ + " tls_verify_certificate = false\n"
+ + " tls_verify_hostname = false\n"
+ + "\n"
+ + " index = \"auth_test_apikey_encoded_target\"\n"
+ + " schema_save_mode =
\"CREATE_SCHEMA_WHEN_NOT_EXIST\"\n"
+ + " data_save_mode = \"APPEND_DATA\"\n"
+ + " }\n"
+ + "}\n",
+ validEncodedApiKey, validEncodedApiKey);
+ }
+
+ /** Setup test data for authentication tests */
+ private void setupAuthTestData() throws Exception {
+ String testIndex = "auth_test_index";
+
+ // Create index mapping
+ String mapping =
+ "{"
+ + "\"mappings\": {"
+ + "\"properties\": {"
+ + "\"id\": {\"type\": \"integer\"},"
+ + "\"name\": {\"type\": \"text\"},"
+ + "\"category\": {\"type\": \"keyword\"},"
+ + "\"price\": {\"type\": \"double\"},"
+ + "\"timestamp\": {\"type\": \"date\"}"
+ + "}"
+ + "}"
+ + "}";
+
+ try {
+ esRestClient.createIndex(testIndex, mapping);
+ log.info("Created test index: {}", testIndex);
+ } catch (Exception e) {
+ log.warn("Index might already exist: {}", e.getMessage());
+ }
+
+ // Insert test data
+ StringBuilder requestBody = new StringBuilder();
+ String indexHeader = "{\"index\":{\"_index\":\"" + testIndex +
"\"}}\n";
+
+ String[] categories = {"electronics", "books", "clothing", "home",
"sports"};
+ for (int i = 1; i <= 10; i++) {
+ Map<String, Object> doc = new HashMap<>();
+ doc.put("id", i);
+ doc.put("name", "Auth Test Product " + i);
+ doc.put("category", categories[i % categories.length]);
+ doc.put("price", 15.99 + (i * 3.5)); // Prices from 19.49 to 50.49
+ doc.put("timestamp", "2024-01-" + String.format("%02d", i) +
"T10:00:00Z");
+
+ requestBody.append(indexHeader);
+ requestBody.append(objectMapper.writeValueAsString(doc));
+ requestBody.append("\n");
+ }
+
+ BulkResponse response = esRestClient.bulk(requestBody.toString());
+ if (response.isErrors()) {
+ log.warn("Some documents might already exist: {}",
response.getResponse());
+ }
+
+ // Wait for index refresh
+ Thread.sleep(2000);
+
+ long docCount =
esRestClient.getIndexDocsCount(testIndex).get(0).getDocsCount();
+ log.info("Test data setup completed - {} documents in source index",
docCount);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 1831496e5b..2479be67b0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -76,7 +76,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -118,17 +117,16 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
DockerLoggerFactory.getLogger("elasticsearch:8.9.0")));
Startables.deepStart(Stream.of(container)).join();
log.info("Elasticsearch container started");
- esRestClient =
- EsRestClient.createInstance(
- Lists.newArrayList("https://" +
container.getHttpHostAddress()),
- Optional.of("elastic"),
- Optional.of("elasticsearch"),
- false,
- false,
- Optional.empty(),
- Optional.empty(),
- Optional.empty(),
- Optional.empty());
+ // Create configuration for EsRestClient
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("hosts", Lists.newArrayList("https://" +
container.getHttpHostAddress()));
+ configMap.put("username", "elastic");
+ configMap.put("password", "elasticsearch");
+ configMap.put("tls_verify_certificate", false);
+ configMap.put("tls_verify_hostname", false);
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ esRestClient = EsRestClient.createInstance(config);
testDataset1 = generateTestDataSet1();
testDataset2 = generateTestDataSet2();
createIndexForResourceNull("st_index");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
index d09954fcc1..c4608361dd 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
@@ -49,8 +50,9 @@ import org.testcontainers.utility.DockerLoggerFactory;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -97,17 +99,16 @@ public class ElasticsearchSchemaChangeIT extends
TestSuiteBase implements TestRe
DockerLoggerFactory.getLogger("elasticsearch:8.9.0")));
Startables.deepStart(Stream.of(container)).join();
log.info("Elasticsearch container started");
- esRestClient =
- EsRestClient.createInstance(
- Lists.newArrayList("https://" +
container.getHttpHostAddress()),
- Optional.of("elastic"),
- Optional.of("elasticsearch"),
- false,
- false,
- Optional.empty(),
- Optional.empty(),
- Optional.empty(),
- Optional.empty());
+ // Create configuration for EsRestClient
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("hosts", Lists.newArrayList("https://" +
container.getHttpHostAddress()));
+ configMap.put("username", "elastic");
+ configMap.put("password", "elasticsearch");
+ configMap.put("tls_verify_certificate", false);
+ configMap.put("tls_verify_hostname", false);
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ esRestClient = EsRestClient.createInstance(config);
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
shopDatabase.createAndInitialize();