This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a2bfe1a530 [Feature][elasticsearch-connector] Add API key 
authentication support (#9610)
a2bfe1a530 is described below

commit a2bfe1a5305c9f1acda284011be5d9fe566dfb64
Author: CosmosNi <[email protected]>
AuthorDate: Wed Jul 30 10:03:42 2025 +0800

    [Feature][elasticsearch-connector] Add API key authentication support 
(#9610)
---
 docs/en/connector-v2/sink/Elasticsearch.md         |  88 ++-
 docs/en/connector-v2/source/Elasticsearch.md       |  88 ++-
 .../elasticsearch/client/EsRestClient.java         | 130 +---
 .../auth/AbstractAuthenticationProvider.java       | 121 ++++
 .../client/auth/ApiKeyAuthProvider.java            | 109 +++
 .../client/auth/ApiKeyEncodedAuthProvider.java     |  99 +++
 .../client/auth/AuthenticationProvider.java        |  63 ++
 .../client/auth/AuthenticationProviderFactory.java |  86 +++
 .../client/auth/BasicAuthProvider.java             |  97 +++
 .../elasticsearch/config/AuthTypeEnum.java         |  55 ++
 .../config/ElasticsearchBaseOptions.java           |  27 +
 .../exception/ElasticsearchConnectorErrorCode.java |   3 +
 .../sink/ElasticsearchSinkFactory.java             |   8 +
 .../source/ElasticsearchSourceFactory.java         |   8 +
 .../elasticsearch/ElasticsearchAuthIT.java         | 787 +++++++++++++++++++++
 .../connector/elasticsearch/ElasticsearchIT.java   |  22 +-
 .../elasticsearch/ElasticsearchSchemaChangeIT.java |  25 +-
 17 files changed, 1673 insertions(+), 143 deletions(-)

diff --git a/docs/en/connector-v2/sink/Elasticsearch.md 
b/docs/en/connector-v2/sink/Elasticsearch.md
index 11825413d4..f466cddeff 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -30,8 +30,13 @@ Engine Supported
 | index_type              | string  | no       |                              |
 | primary_keys            | list    | no       |                              |
 | key_delimiter           | string  | no       | `_`                          |
+| auth_type               | string  | no       | basic                        |
 | username                | string  | no       |                              |
 | password                | string  | no       |                              |
+| auth.api_key_id         | string  | no       | -                            |
+| auth.api_key            | string  | no       | -                            |
+| auth.api_key_encoded    | string  | no       | -                            |
+
 | max_retry_count         | int     | no       | 3                            |
 | max_batch_size          | int     | no       | 10                           |
 | tls_verify_certificate  | boolean | no       | true                         |
@@ -64,13 +69,88 @@ Primary key fields used to generate the document `_id`, 
this is cdc required opt
 
 Delimiter for composite keys ("_" by default), e.g., "$" would result in 
document `_id` "KEY1$KEY2$KEY3".
 
-### username [string]
+## Authentication
+
+The Elasticsearch connector supports multiple authentication methods to 
connect to secured Elasticsearch clusters. You can choose the appropriate 
authentication method based on your Elasticsearch security configuration.
+
+### auth_type [enum]
+
+Specifies the authentication method to use. Supported values:
+- `basic` (default): HTTP Basic Authentication using username and password
+- `api_key`: Elasticsearch API Key authentication using separate ID and key
+- `api_key_encoded`: Elasticsearch API Key authentication using encoded key
+
+If not specified, defaults to `basic` for backward compatibility.
+
+### Basic Authentication
+
+Basic authentication uses HTTP Basic Authentication with username and password 
credentials.
+
+#### username [string]
+
+Username for basic authentication (x-pack username).
+
+#### password [string]
+
+Password for basic authentication (x-pack password).
+
+**Example:**
+```hocon
+sink {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        auth_type = "basic"
+        username = "elastic"
+        password = "your_password"
+        index = "my_index"
+    }
+}
+```
+
+### API Key Authentication
+
+API Key authentication provides a more secure way to authenticate with 
Elasticsearch using API keys.
 
-x-pack username
+#### auth.api_key_id [string]
+
+The API key ID generated by Elasticsearch.
+
+#### auth.api_key [string]
+
+The API key secret generated by Elasticsearch.
+
+#### auth.api_key_encoded [string]
+
+Base64 encoded API key in the format `base64(id:api_key)`. This is an 
alternative to specifying `auth.api_key_id` and `auth.api_key` separately.
+
+**Note:** You can use either `auth.api_key_id` + `auth.api_key` OR 
`auth.api_key_encoded`, but not both.
+
+**Example with separate ID and key:**
+```hocon
+sink {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        auth_type = "api_key"
+        auth.api_key_id = "your_api_key_id"
+        auth.api_key = "your_api_key_secret"
+        index = "my_index"
+    }
+}
+```
+
+**Example with encoded key:**
+```hocon
+sink {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        auth_type = "api_key_encoded"
+        auth.api_key_encoded = 
"eW91cl9hcGlfa2V5X2lkOnlvdXJfYXBpX2tleV9zZWNyZXQ="
+        index = "my_index"
+    }
+}
+```
 
-### password [string]
 
-x-pack password
 
 ### max_retry_count [int]
 
diff --git a/docs/en/connector-v2/source/Elasticsearch.md 
b/docs/en/connector-v2/source/Elasticsearch.md
index dff046767b..69caf3edc6 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -24,8 +24,13 @@ support version >= 2.x and <= 8.x.
 | name                    | type    | required | default value                 
                                 |
 
|-------------------------|---------|----------|----------------------------------------------------------------|
 | hosts                   | array   | yes      | -                             
                                 |
+| auth_type               | string  | no       | basic                         
                                 |
 | username                | string  | no       | -                             
                                 |
 | password                | string  | no       | -                             
                                 |
+| auth.api_key_id         | string  | no       | -                             
                                 |
+| auth.api_key            | string  | no       | -                             
                                 |
+| auth.api_key_encoded    | string  | no       | -                             
                                 |
+
 | index                   | string  | no       | If the index list does not 
exist, the index must be configured |
 | index_list              | array   | no       | used to define a multiple 
table task                           |
 | source                  | array   | no       | -                             
                                 |
@@ -52,13 +57,88 @@ support version >= 2.x and <= 8.x.
 
 Elasticsearch cluster http address, the format is `host:port`, allowing 
multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.
 
-### username [string]
+## Authentication
+
+The Elasticsearch connector supports multiple authentication methods to 
connect to secured Elasticsearch clusters. You can choose the appropriate 
authentication method based on your Elasticsearch security configuration.
+
+### auth_type [enum]
+
+Specifies the authentication method to use. Supported values:
+- `basic` (default): HTTP Basic Authentication using username and password
+- `api_key`: Elasticsearch API Key authentication using separate ID and key
+- `api_key_encoded`: Elasticsearch API Key authentication using encoded key
+
+If not specified, defaults to `basic` for backward compatibility.
+
+### Basic Authentication
+
+Basic authentication uses HTTP Basic Authentication with username and password 
credentials.
+
+#### username [string]
+
+Username for basic authentication (x-pack username).
+
+#### password [string]
+
+Password for basic authentication (x-pack password).
+
+**Example:**
+```hocon
+source {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        auth_type = "basic"
+        username = "elastic"
+        password = "your_password"
+        index = "my_index"
+    }
+}
+```
+
+### API Key Authentication
+
+API Key authentication provides a more secure way to authenticate with 
Elasticsearch using API keys.
 
-x-pack username.
+#### auth.api_key_id [string]
+
+The API key ID generated by Elasticsearch.
+
+#### auth.api_key [string]
+
+The API key secret generated by Elasticsearch.
+
+#### auth.api_key_encoded [string]
+
+Base64 encoded API key in the format `base64(id:api_key)`. This is an 
alternative to specifying `auth.api_key_id` and `auth.api_key` separately.
+
+**Note:** You can use either `auth.api_key_id` + `auth.api_key` OR 
`auth.api_key_encoded`, but not both.
+
+**Example with separate ID and key:**
+```hocon
+source {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        auth_type = "api_key"
+        auth.api_key_id = "your_api_key_id"
+        auth.api_key = "your_api_key_secret"
+        index = "my_index"
+    }
+}
+```
+
+**Example with encoded key:**
+```hocon
+source {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        auth_type = "api_key_encoded"
+        auth.api_key_encoded = 
"eW91cl9hcGlfa2V5X2lkOnlvdXJfYXBpX2tleV9zZWNyZXQ="
+        index = "my_index"
+    }
+}
+```
 
-### password [string]
 
-x-pack password.
 
 ### index [string]
 
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 69c34faef6..cccf36e3dc 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -26,6 +26,8 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProviderFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
@@ -34,19 +36,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.PointI
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.SSLUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpStatus;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.TrustAllStrategy;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.ssl.SSLContexts;
 import org.apache.http.util.Asserts;
 import org.apache.http.util.EntityUtils;
 
@@ -57,8 +51,6 @@ import org.elasticsearch.client.RestClientBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
-import javax.net.ssl.SSLContext;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -98,117 +90,33 @@ public class EsRestClient implements Closeable {
 
     public static EsRestClient createInstance(ReadonlyConfig config) {
         List<String> hosts = config.get(ElasticsearchBaseOptions.HOSTS);
-        Optional<String> username = 
config.getOptional(ElasticsearchBaseOptions.USERNAME);
-        Optional<String> password = 
config.getOptional(ElasticsearchBaseOptions.PASSWORD);
-        Optional<String> keystorePath = Optional.empty();
-        Optional<String> keystorePassword = Optional.empty();
-        Optional<String> truststorePath = Optional.empty();
-        Optional<String> truststorePassword = Optional.empty();
-        boolean tlsVerifyCertificate = 
config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE);
-        if (tlsVerifyCertificate) {
-            keystorePath = 
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH);
-            keystorePassword = 
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD);
-            truststorePath = 
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH);
-            truststorePassword =
-                    
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD);
-        }
 
-        boolean tlsVerifyHostnames = 
config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME);
-        return createInstance(
-                hosts,
-                username,
-                password,
-                tlsVerifyCertificate,
-                tlsVerifyHostnames,
-                keystorePath,
-                keystorePassword,
-                truststorePath,
-                truststorePassword);
-    }
+        // Create basic RestClient builder
+        RestClientBuilder restClientBuilder = createRestClientBuilder(hosts);
+
+        // Configure authentication and TLS using the new authentication system
+        AuthenticationProvider authProvider = 
AuthenticationProviderFactory.createProvider(config);
+        authProvider.configure(restClientBuilder, config);
 
-    public static EsRestClient createInstance(
-            List<String> hosts,
-            Optional<String> username,
-            Optional<String> password,
-            boolean tlsVerifyCertificate,
-            boolean tlsVerifyHostnames,
-            Optional<String> keystorePath,
-            Optional<String> keystorePassword,
-            Optional<String> truststorePath,
-            Optional<String> truststorePassword) {
-        RestClientBuilder restClientBuilder =
-                getRestClientBuilder(
-                        hosts,
-                        username,
-                        password,
-                        tlsVerifyCertificate,
-                        tlsVerifyHostnames,
-                        keystorePath,
-                        keystorePassword,
-                        truststorePath,
-                        truststorePassword);
         return new EsRestClient(restClientBuilder.build());
     }
 
-    private static RestClientBuilder getRestClientBuilder(
-            List<String> hosts,
-            Optional<String> username,
-            Optional<String> password,
-            boolean tlsVerifyCertificate,
-            boolean tlsVerifyHostnames,
-            Optional<String> keystorePath,
-            Optional<String> keystorePassword,
-            Optional<String> truststorePath,
-            Optional<String> truststorePassword) {
+    /**
+     * Create a basic RestClientBuilder with hosts and request configuration. 
Authentication and TLS
+     * configuration will be handled by AuthenticationProvider.
+     */
+    private static RestClientBuilder createRestClientBuilder(List<String> 
hosts) {
         HttpHost[] httpHosts = new HttpHost[hosts.size()];
         for (int i = 0; i < hosts.size(); i++) {
             httpHosts[i] = HttpHost.create(hosts.get(i));
         }
 
-        RestClientBuilder restClientBuilder =
-                RestClient.builder(httpHosts)
-                        .setRequestConfigCallback(
-                                requestConfigBuilder ->
-                                        requestConfigBuilder
-                                                .setConnectionRequestTimeout(
-                                                        
CONNECTION_REQUEST_TIMEOUT)
-                                                
.setSocketTimeout(SOCKET_TIMEOUT));
-
-        restClientBuilder.setHttpClientConfigCallback(
-                httpClientBuilder -> {
-                    if (username.isPresent()) {
-                        CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
-                        credentialsProvider.setCredentials(
-                                AuthScope.ANY,
-                                new 
UsernamePasswordCredentials(username.get(), password.get()));
-                        
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-                    }
-
-                    try {
-                        if (tlsVerifyCertificate) {
-                            Optional<SSLContext> sslContext =
-                                    SSLUtils.buildSSLContext(
-                                            keystorePath,
-                                            keystorePassword,
-                                            truststorePath,
-                                            truststorePassword);
-                            
sslContext.ifPresent(httpClientBuilder::setSSLContext);
-                        } else {
-                            SSLContext sslContext =
-                                    SSLContexts.custom()
-                                            .loadTrustMaterial(new 
TrustAllStrategy())
-                                            .build();
-                            httpClientBuilder.setSSLContext(sslContext);
-                        }
-                        if (!tlsVerifyHostnames) {
-                            
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                    return httpClientBuilder;
-                });
-        return restClientBuilder;
+        return RestClient.builder(httpHosts)
+                .setRequestConfigCallback(
+                        requestConfigBuilder ->
+                                requestConfigBuilder
+                                        
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
+                                        .setSocketTimeout(SOCKET_TIMEOUT));
     }
 
     public BulkResponse bulk(String requestBody) {
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AbstractAuthenticationProvider.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AbstractAuthenticationProvider.java
new file mode 100644
index 0000000000..5c2b2a924e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AbstractAuthenticationProvider.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.SSLUtils;
+
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.ssl.SSLContexts;
+
+import org.elasticsearch.client.RestClientBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.net.ssl.SSLContext;
+
+import java.util.Optional;
+
+@Slf4j
+public abstract class AbstractAuthenticationProvider implements 
AuthenticationProvider {
+
+    @Override
+    public final void configure(RestClientBuilder builder, ReadonlyConfig 
config) {
+        builder.setHttpClientConfigCallback(
+                httpClientBuilder -> {
+                    // Configure authentication first
+                    configureAuthentication(httpClientBuilder, config);
+
+                    // Then configure TLS
+                    configureTLS(httpClientBuilder, config);
+
+                    return httpClientBuilder;
+                });
+    }
+
+    /**
+     * Configure the specific authentication mechanism.
+     *
+     * <p>Subclasses should implement this method to set up their specific 
authentication logic on
+     * the HttpAsyncClientBuilder.
+     *
+     * @param httpClientBuilder the HTTP client builder to configure
+     * @param config the readonly configuration containing authentication 
parameters
+     */
+    protected abstract void configureAuthentication(
+            HttpAsyncClientBuilder httpClientBuilder, ReadonlyConfig config);
+
+    /**
+     * Configure TLS settings for the HTTP client.
+     *
+     * <p>This method handles SSL/TLS configuration including certificate 
verification, hostname
+     * verification, and custom keystores/truststores.
+     *
+     * @param httpClientBuilder the HTTP client builder to configure
+     * @param config the readonly configuration containing TLS parameters
+     */
+    protected void configureTLS(HttpAsyncClientBuilder httpClientBuilder, 
ReadonlyConfig config) {
+        boolean tlsVerifyCertificate = 
config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE);
+        boolean tlsVerifyHostnames = 
config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME);
+
+        try {
+            if (tlsVerifyCertificate) {
+                Optional<String> keystorePath =
+                        
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH);
+                Optional<String> keystorePassword =
+                        
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD);
+                Optional<String> truststorePath =
+                        
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH);
+                Optional<String> truststorePassword =
+                        
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD);
+
+                Optional<SSLContext> sslContext =
+                        SSLUtils.buildSSLContext(
+                                keystorePath, keystorePassword, 
truststorePath, truststorePassword);
+
+                if (sslContext.isPresent()) {
+                    httpClientBuilder.setSSLContext(sslContext.get());
+                    log.debug("Custom SSL context configured with 
keystore/truststore");
+                } else {
+                    log.debug("No custom SSL context configured, using 
default");
+                }
+            } else {
+                // Trust all certificates (not recommended for production)
+                SSLContext sslContext =
+                        SSLContexts.custom().loadTrustMaterial(new 
TrustAllStrategy()).build();
+                httpClientBuilder.setSSLContext(sslContext);
+                log.warn("TLS certificate verification disabled - not 
recommended for production");
+            }
+
+            if (!tlsVerifyHostnames) {
+                
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
+                log.warn("TLS hostname verification disabled - not recommended 
for production");
+            }
+
+            log.debug(
+                    "TLS configuration completed - certificate verification: 
{}, hostname verification: {}",
+                    tlsVerifyCertificate,
+                    tlsVerifyHostnames);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to configure TLS settings", e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyAuthProvider.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyAuthProvider.java
new file mode 100644
index 0000000000..bbd5def631
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyAuthProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Optional;
+
+@Slf4j
+public class ApiKeyAuthProvider extends AbstractAuthenticationProvider {
+
+    private static final String AUTH_TYPE = "api_key";
+    private static final String API_KEY_HEADER = "Authorization";
+    private static final String API_KEY_PREFIX = "ApiKey ";
+
+    @Override
+    protected void configureAuthentication(
+            HttpAsyncClientBuilder httpClientBuilder, ReadonlyConfig config) {
+        String encodedApiKey = getEncodedApiKey(config);
+
+        if (encodedApiKey != null) {
+            log.debug("Configuring API key authentication");
+
+            // Add API key header to all requests
+            httpClientBuilder.addInterceptorFirst(
+                    (org.apache.http.HttpRequestInterceptor)
+                            (request, context) -> {
+                                request.setHeader(API_KEY_HEADER, 
API_KEY_PREFIX + encodedApiKey);
+                            });
+
+            log.info("API key authentication configured successfully");
+        } else {
+            log.debug(
+                    "No API key credentials provided, skipping API key 
authentication configuration");
+        }
+    }
+
+    @Override
+    public String getAuthType() {
+        return AUTH_TYPE;
+    }
+
+    @Override
+    public void validate(ReadonlyConfig config) {
+        Optional<String> apiKeyId = 
config.getOptional(ElasticsearchBaseOptions.API_KEY_ID);
+        Optional<String> apiKey = 
config.getOptional(ElasticsearchBaseOptions.API_KEY);
+        Optional<String> apiKeyEncoded =
+                config.getOptional(ElasticsearchBaseOptions.API_KEY_ENCODED);
+
+        if (!apiKeyId.isPresent() || !apiKey.isPresent()) {
+            throw new IllegalArgumentException(
+                    "API key authentication with auth_type='api_key' requires 
both api_key_id and api_key");
+        }
+        validateApiKeyIdAndSecret(apiKeyId.get(), apiKey.get());
+
+        log.debug("API key authentication configuration validated");
+    }
+
+    /**
+     * Get the encoded API key from configuration.
+     *
+     * @param config the configuration
+     * @return the Base64 encoded API key, or null if not configured
+     */
+    private String getEncodedApiKey(ReadonlyConfig config) {
+        Optional<String> apiKeyId = 
config.getOptional(ElasticsearchBaseOptions.API_KEY_ID);
+        Optional<String> apiKey = 
config.getOptional(ElasticsearchBaseOptions.API_KEY);
+
+        if (apiKeyId.isPresent() && apiKey.isPresent()) {
+            String credentials = apiKeyId.get() + ":" + apiKey.get();
+            return 
Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8));
+        }
+
+        return null;
+    }
+
+    /** Validate API key ID and secret. */
+    private void validateApiKeyIdAndSecret(String apiKeyId, String apiKey) {
+        if (apiKeyId == null || apiKeyId.trim().isEmpty()) {
+            throw new IllegalArgumentException("API key ID cannot be null or 
empty");
+        }
+
+        if (apiKey == null || apiKey.trim().isEmpty()) {
+            throw new IllegalArgumentException("API key cannot be null or 
empty");
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyEncodedAuthProvider.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyEncodedAuthProvider.java
new file mode 100644
index 0000000000..200bc13705
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/ApiKeyEncodedAuthProvider.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Optional;
+
+@Slf4j
+public class ApiKeyEncodedAuthProvider extends AbstractAuthenticationProvider {
+
+    private static final String AUTH_TYPE = "api_key_encoded";
+    private static final String API_KEY_HEADER = "Authorization";
+    private static final String API_KEY_PREFIX = "ApiKey ";
+
+    @Override
+    protected void configureAuthentication(
+            HttpAsyncClientBuilder httpClientBuilder, ReadonlyConfig config) {
+        Optional<String> apiKeyEncoded =
+                config.getOptional(ElasticsearchBaseOptions.API_KEY_ENCODED);
+
+        if (apiKeyEncoded.isPresent()) {
+            log.debug("Configuring encoded API key authentication");
+
+            // Add API key header to all requests
+            httpClientBuilder.addInterceptorFirst(
+                    (org.apache.http.HttpRequestInterceptor)
+                            (request, context) -> {
+                                request.setHeader(
+                                        API_KEY_HEADER, API_KEY_PREFIX + 
apiKeyEncoded.get());
+                            });
+
+            log.info("Encoded API key authentication configured successfully");
+        } else {
+            log.debug(
+                    "No encoded API key provided, skipping encoded API key 
authentication configuration");
+        }
+    }
+
+    @Override
+    public String getAuthType() {
+        return AUTH_TYPE;
+    }
+
+    @Override
+    public void validate(ReadonlyConfig config) {
+        Optional<String> apiKeyEncoded =
+                config.getOptional(ElasticsearchBaseOptions.API_KEY_ENCODED);
+        if (!apiKeyEncoded.isPresent()) {
+            throw new IllegalArgumentException(
+                    "API key authentication with auth_type='api_key_encoded' 
requires api_key_encoded");
+        }
+        validateEncodedApiKey(apiKeyEncoded.get());
+
+        log.debug("Encoded API key authentication configuration validated");
+    }
+
+    /** Validate encoded API key. */
+    private void validateEncodedApiKey(String apiKeyEncoded) {
+        if (apiKeyEncoded == null || apiKeyEncoded.trim().isEmpty()) {
+            throw new IllegalArgumentException("Encoded API key cannot be null 
or empty");
+        }
+
+        try {
+            byte[] decoded = Base64.getDecoder().decode(apiKeyEncoded);
+            String decodedStr = new String(decoded, StandardCharsets.UTF_8);
+
+            if (!decodedStr.contains(":")) {
+                throw new IllegalArgumentException(
+                        "Encoded API key must be Base64 encoded 'id:key' 
format");
+            }
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                    "Invalid encoded API key format: " + e.getMessage(), e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProvider.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProvider.java
new file mode 100644
index 0000000000..968cd1f5a4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.elasticsearch.client.RestClientBuilder;
+
+public interface AuthenticationProvider {
+
+    /**
+     * Configure the Elasticsearch RestClient with authentication and TLS 
settings.
+     *
+     * <p>This method is called during client initialization to set up the 
appropriate
+     * authentication mechanism and TLS configuration on the 
RestClientBuilder. The implementation
+     * should handle both authentication and TLS configuration to ensure they 
work together
+     * properly.
+     *
+     * @param builder the RestClientBuilder to configure
+     * @param config the readonly configuration containing authentication and 
TLS parameters
+     * @throws IllegalArgumentException if the configuration is invalid
+     * @throws RuntimeException if authentication or TLS setup fails
+     */
+    void configure(RestClientBuilder builder, ReadonlyConfig config);
+
+    /**
+     * Get the authentication type identifier.
+     *
+     * <p>This identifier is used to match the authentication provider with 
the configured auth_type
+     * parameter. It should be a unique, lowercase string that clearly 
identifies the authentication
+     * mechanism.
+     *
+     * @return the authentication type identifier (e.g., "basic", "api_key", 
"oauth2")
+     */
+    String getAuthType();
+
+    /**
+     * Validate the authentication configuration.
+     *
+     * <p>This method is called before authentication setup to ensure that all 
required
+     * configuration parameters are present and valid. It should throw an 
exception if the
+     * configuration is incomplete or invalid.
+     *
+     * @param config the readonly configuration to validate
+     * @throws IllegalArgumentException if required parameters are missing or 
invalid
+     */
+    void validate(ReadonlyConfig config);
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProviderFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProviderFactory.java
new file mode 100644
index 0000000000..c7eeaef8a8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/AuthenticationProviderFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.AuthTypeEnum;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode.UNSUPPORTED_AUTH_TYPE;
+
+@Slf4j
+public class AuthenticationProviderFactory {
+
+    private static final AuthTypeEnum DEFAULT_AUTH_TYPE = AuthTypeEnum.BASIC;
+
+    private static final Map<AuthTypeEnum, Class<? extends 
AuthenticationProvider>>
+            PROVIDER_REGISTRY = new HashMap<>();
+
+    static {
+        // Register built-in authentication providers
+        PROVIDER_REGISTRY.put(AuthTypeEnum.BASIC, BasicAuthProvider.class);
+        PROVIDER_REGISTRY.put(AuthTypeEnum.API_KEY, ApiKeyAuthProvider.class);
+        PROVIDER_REGISTRY.put(AuthTypeEnum.API_KEY_ENCODED, 
ApiKeyEncodedAuthProvider.class);
+    }
+
+    /**
+     * Create an authentication provider based on the configuration.
+     *
+     * <p>This method examines the auth_type configuration parameter and 
creates the appropriate
+     * authentication provider. If no auth_type is specified, it defaults to 
basic authentication
+     * for backward compatibility.
+     *
+     * @param config the readonly configuration containing authentication 
settings
+     * @return the appropriate authentication provider
+     * @throws ElasticsearchConnectorException if the auth_type is not 
supported
+     */
+    public static AuthenticationProvider createProvider(ReadonlyConfig config) 
{
+        AuthTypeEnum authType =
+                
config.getOptional(ElasticsearchBaseOptions.AUTH_TYPE).orElse(DEFAULT_AUTH_TYPE);
+
+        log.debug("Creating authentication provider for type: {}", authType);
+
+        Class<? extends AuthenticationProvider> providerClass = 
PROVIDER_REGISTRY.get(authType);
+        if (providerClass == null) {
+            throw new ElasticsearchConnectorException(
+                    UNSUPPORTED_AUTH_TYPE,
+                    String.format(
+                            "Unsupported authentication type: %s. Supported 
types: %s",
+                            authType, PROVIDER_REGISTRY.keySet()));
+        }
+
+        try {
+            AuthenticationProvider provider = 
providerClass.getDeclaredConstructor().newInstance();
+            provider.validate(config);
+            log.info("Successfully created authentication provider: {}", 
authType);
+            return provider;
+        } catch (Exception e) {
+            throw new ElasticsearchConnectorException(
+                    UNSUPPORTED_AUTH_TYPE,
+                    String.format(
+                            "Failed to create authentication provider for 
type: %s", authType),
+                    e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/BasicAuthProvider.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/BasicAuthProvider.java
new file mode 100644
index 0000000000..7e4d0ce010
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/auth/BasicAuthProvider.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Optional;
+
+@Slf4j
+public class BasicAuthProvider extends AbstractAuthenticationProvider {
+
+    private static final String AUTH_TYPE = "basic";
+
+    @Override
+    protected void configureAuthentication(
+            HttpAsyncClientBuilder httpClientBuilder, ReadonlyConfig config) {
+        Optional<String> username = 
config.getOptional(ElasticsearchBaseOptions.USERNAME);
+        Optional<String> password = 
config.getOptional(ElasticsearchBaseOptions.PASSWORD);
+
+        if (username.isPresent() && password.isPresent()) {
+            log.debug("Configuring basic authentication for user: {}", 
username.get());
+
+            CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+            credentialsProvider.setCredentials(
+                    AuthScope.ANY, new 
UsernamePasswordCredentials(username.get(), password.get()));
+            
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+
+            log.info("Basic authentication configured successfully for user: 
{}", username.get());
+        } else {
+            log.debug("No username/password provided, skipping basic 
authentication configuration");
+        }
+    }
+
+    @Override
+    public String getAuthType() {
+        return AUTH_TYPE;
+    }
+
+    @Override
+    public void validate(ReadonlyConfig config) {
+        Optional<String> username = 
config.getOptional(ElasticsearchBaseOptions.USERNAME);
+        Optional<String> password = 
config.getOptional(ElasticsearchBaseOptions.PASSWORD);
+
+        // For backward compatibility, we allow basic auth to be optional
+        // If username is provided, password must also be provided
+        if (username.isPresent() && !password.isPresent()) {
+            throw new IllegalArgumentException(
+                    "Password is required when username is provided for basic 
authentication");
+        }
+
+        if (!username.isPresent() && password.isPresent()) {
+            throw new IllegalArgumentException(
+                    "Username is required when password is provided for basic 
authentication");
+        }
+
+        if (username.isPresent()) {
+            String usernameValue = username.get();
+            if (usernameValue == null || usernameValue.trim().isEmpty()) {
+                throw new IllegalArgumentException("Username cannot be null or 
empty");
+            }
+
+            String passwordValue = password.get();
+            if (passwordValue == null || passwordValue.trim().isEmpty()) {
+                throw new IllegalArgumentException("Password cannot be null or 
empty");
+            }
+
+            log.debug("Basic authentication configuration validated for user: 
{}", usernameValue);
+        } else {
+            log.debug(
+                    "No basic authentication credentials provided - 
authentication will be skipped");
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/AuthTypeEnum.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/AuthTypeEnum.java
new file mode 100644
index 0000000000..21b6235e5e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/AuthTypeEnum.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+
+public enum AuthTypeEnum {
+    /** HTTP Basic Authentication using username and password */
+    BASIC("basic"),
+
+    /** Elasticsearch API Key authentication using api_key_id and api_key */
+    API_KEY("api_key"),
+
+    /** Elasticsearch API Key authentication using encoded api_key */
+    API_KEY_ENCODED("api_key_encoded");
+
+    private final String value;
+
+    AuthTypeEnum(String value) {
+        this.value = value;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    /**
+     * Get AuthTypeEnum from string value.
+     *
+     * @param value the string value
+     * @return the corresponding AuthTypeEnum
+     * @throws IllegalArgumentException if the value is not supported
+     */
+    public static AuthTypeEnum fromValue(String value) {
+        for (AuthTypeEnum authType : values()) {
+            if (authType.getValue().equals(value)) {
+                return authType;
+            }
+        }
+        throw new IllegalArgumentException("Unsupported auth type: " + value);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
index c5e688443e..1c448bd392 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
@@ -87,4 +87,31 @@ public class ElasticsearchBaseOptions implements 
Serializable {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The key password for the trust store 
specified");
+
+    // Authentication configuration options
+    public static final Option<AuthTypeEnum> AUTH_TYPE =
+            Options.key("auth_type")
+                    .enumType(AuthTypeEnum.class)
+                    .defaultValue(AuthTypeEnum.BASIC)
+                    .withDescription(
+                            "Authentication type. Supported values: basic, 
api_key, api_key_encoded");
+
+    // API Key authentication options
+    public static final Option<String> API_KEY_ID =
+            Options.key("auth.api_key_id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch API key ID for 
authentication");
+
+    public static final Option<String> API_KEY =
+            Options.key("auth.api_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch API key secret for 
authentication");
+
+    public static final Option<String> API_KEY_ENCODED =
+            Options.key("auth.api_key_encoded")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Base64 encoded Elasticsearch API key 
(id:key format)");
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index b804434b5a..0129c1d8c6 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -40,6 +40,9 @@ public enum ElasticsearchConnectorErrorCode implements 
SeaTunnelErrorCode {
     CREATE_PIT_FAILED("ELASTICSEARCH-15", "Create Point-in-Time failed"),
     DELETE_PIT_FAILED("ELASTICSEARCH-16", "Delete Point-in-Time failed"),
     SEARCH_WITH_PIT_FAILED("ELASTICSEARCH-17", "Search with Point-in-Time 
failed"),
+    UNSUPPORTED_AUTH_TYPE("ELASTICSEARCH-18", "Unsupported authentication 
type"),
+    AUTH_CONFIG_INVALID("ELASTICSEARCH-19", "Authentication configuration is 
invalid"),
+    AUTH_SETUP_FAILED("ELASTICSEARCH-20", "Authentication setup failed"),
     ;
 
     private final String code;
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index d4b27c7acd..8480ca9fb4 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -26,10 +26,15 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.AuthTypeEnum;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
 
 import com.google.auto.service.AutoService;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY_ENCODED;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.AUTH_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.HOSTS;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD;
@@ -76,6 +81,9 @@ public class ElasticsearchSinkFactory implements 
TableSinkFactory {
                         TLS_TRUST_STORE_PATH,
                         TLS_TRUST_STORE_PASSWORD,
                         SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
+                .optional(AUTH_TYPE)
+                .conditional(AUTH_TYPE, AuthTypeEnum.API_KEY, API_KEY_ID, 
API_KEY)
+                .conditional(AUTH_TYPE, AuthTypeEnum.API_KEY_ENCODED, 
API_KEY_ENCODED)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index fb11f72faf..dded093dbe 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -24,11 +24,16 @@ import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.AuthTypeEnum;
 
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY_ENCODED;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.API_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.AUTH_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.HOSTS;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.INDEX;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.PASSWORD;
@@ -77,6 +82,9 @@ public class ElasticsearchSourceFactory implements 
TableSourceFactory {
                         TLS_KEY_STORE_PASSWORD,
                         TLS_TRUST_STORE_PATH,
                         TLS_TRUST_STORE_PASSWORD)
+                .optional(AUTH_TYPE)
+                .conditional(AUTH_TYPE, AuthTypeEnum.API_KEY, API_KEY_ID, 
API_KEY)
+                .conditional(AUTH_TYPE, AuthTypeEnum.API_KEY_ENCODED, 
API_KEY_ENCODED)
                 .build();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchAuthIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchAuthIT.java
new file mode 100644
index 0000000000..4a8bf2c822
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchAuthIT.java
@@ -0,0 +1,787 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.elasticsearch;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProviderFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class ElasticsearchAuthIT extends TestSuiteBase implements TestResource 
{
+
+    private static final String ELASTICSEARCH_IMAGE = "elasticsearch:8.9.0";
+    private static final long INDEX_REFRESH_DELAY = 2000L;
+
+    // Test data constants
+    private static final String TEST_INDEX = "auth_test_index";
+    private static final String VALID_USERNAME = "elastic";
+    private static final String VALID_PASSWORD = "elasticsearch";
+    private static final String INVALID_USERNAME = "wrong_user";
+    private static final String INVALID_PASSWORD = "wrong_password";
+
+    // API Key test constants - will be set dynamically after container starts
+    private String validApiKeyId;
+    private String validApiKeySecret;
+    private String validEncodedApiKey;
+    private static final String INVALID_API_KEY_ID = "invalid-key-id";
+    private static final String INVALID_API_KEY_SECRET = "invalid-key-secret";
+
+    private ElasticsearchContainer elasticsearchContainer;
+    private EsRestClient esRestClient;
+    private ObjectMapper objectMapper = new ObjectMapper();
+    private CloseableHttpClient httpClient;
+
+    @BeforeEach
+    @Override
+    public void startUp() throws Exception {
+        // Initialize HTTP client with SSL trust all strategy
+        initializeHttpClient();
+
+        // Start Elasticsearch container
+        elasticsearchContainer =
+                new ElasticsearchContainer(
+                                DockerImageName.parse(ELASTICSEARCH_IMAGE)
+                                        .asCompatibleSubstituteFor(
+                                                
"docker.elastic.co/elasticsearch/elasticsearch"))
+                        .withNetwork(NETWORK)
+                        
.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false")
+                        .withEnv("xpack.security.authc.api_key.enabled", 
"true")
+                        .withNetworkAliases("elasticsearch")
+                        .withPassword("elasticsearch")
+                        .withStartupAttempts(5)
+                        .withStartupTimeout(Duration.ofMinutes(5))
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger("elasticsearch:8.9.0")));
+        Startables.deepStart(Stream.of(elasticsearchContainer)).join();
+        log.info("Elasticsearch container started");
+
+        // Wait for Elasticsearch to be ready and create real API keys
+        waitForElasticsearchReady();
+        createRealApiKeys();
+
+        // Initialize ES client for test data setup
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(
+                "hosts",
+                Lists.newArrayList("https://"; + 
elasticsearchContainer.getHttpHostAddress()));
+        configMap.put("username", "elastic");
+        configMap.put("password", "elasticsearch");
+        configMap.put("tls_verify_certificate", false);
+        configMap.put("tls_verify_hostname", false);
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        esRestClient = EsRestClient.createInstance(config);
+        createTestIndex();
+        insertTestData();
+    }
+
+    @AfterEach
+    @Override
+    public void tearDown() throws Exception {
+        if (esRestClient != null) {
+            esRestClient.close();
+        }
+        if (httpClient != null) {
+            httpClient.close();
+        }
+        if (elasticsearchContainer != null) {
+            elasticsearchContainer.stop();
+        }
+    }
+
+    /** Initialize HTTP client with SSL trust all strategy for testing */
+    private void initializeHttpClient()
+            throws NoSuchAlgorithmException, KeyStoreException, 
KeyManagementException {
+        httpClient =
+                HttpClients.custom()
+                        .setSSLContext(
+                                SSLContextBuilder.create()
+                                        
.loadTrustMaterial(TrustAllStrategy.INSTANCE)
+                                        .build())
+                        .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
+                        .build();
+        log.info("HTTP client initialized with SSL trust all strategy");
+    }
+
+    /** Wait for Elasticsearch to be ready */
+    private void waitForElasticsearchReady() throws IOException, 
InterruptedException {
+        String elasticsearchUrl = "https://"; + 
elasticsearchContainer.getHttpHostAddress();
+        String healthUrl = elasticsearchUrl + "/_cluster/health";
+
+        log.info("Waiting for Elasticsearch to be ready at: {}", healthUrl);
+
+        for (int i = 0; i < 30; i++) {
+            try {
+                HttpGet request = new HttpGet(healthUrl);
+                String auth =
+                        Base64.getEncoder()
+                                .encodeToString(
+                                        (VALID_USERNAME + ":" + VALID_PASSWORD)
+                                                
.getBytes(StandardCharsets.UTF_8));
+                request.setHeader("Authorization", "Basic " + auth);
+
+                HttpResponse response = httpClient.execute(request);
+                if (response.getStatusLine().getStatusCode() == 200) {
+                    log.info("Elasticsearch is ready");
+                    return;
+                }
+            } catch (Exception e) {
+                log.debug("Elasticsearch not ready yet, attempt {}/30: {}", i 
+ 1, e.getMessage());
+            }
+
+            TimeUnit.SECONDS.sleep(2);
+        }
+
+        throw new RuntimeException("Elasticsearch failed to become ready 
within timeout");
+    }
+
+    /** Create real API keys using Elasticsearch API */
+    private void createRealApiKeys() throws IOException {
+        String elasticsearchUrl = "https://"; + 
elasticsearchContainer.getHttpHostAddress();
+        String apiKeyUrl = elasticsearchUrl + "/_security/api_key";
+
+        log.info("Creating real API key at: {}", apiKeyUrl);
+
+        String requestBody =
+                "{\n"
+                        + "  \"name\": \"seatunnel-test-api-key\",\n"
+                        + "  \"role_descriptors\": {\n"
+                        + "    \"seatunnel_test_role\": {\n"
+                        + "      \"cluster\": [\"manage\"],\n"
+                        + "      \"indices\": [\n"
+                        + "        {\n"
+                        + "          \"names\": [\""
+                        + TEST_INDEX
+                        + "\", \"auth_test_*\", \"test_*\", \"*_target\"],\n"
+                        + "          \"privileges\": [\"all\"]\n"
+                        + "        }\n"
+                        + "      ]\n"
+                        + "    }\n"
+                        + "  },\n"
+                        + "  \"metadata\": {\n"
+                        + "    \"application\": \"seatunnel-test\",\n"
+                        + "    \"environment\": \"integration-test\"\n"
+                        + "  }\n"
+                        + "}";
+
+        HttpPost request = new HttpPost(apiKeyUrl);
+        String auth =
+                Base64.getEncoder()
+                        .encodeToString(
+                                (VALID_USERNAME + ":" + VALID_PASSWORD)
+                                        .getBytes(StandardCharsets.UTF_8));
+        request.setHeader("Authorization", "Basic " + auth);
+        request.setHeader("Content-Type", "application/json");
+        request.setEntity(new StringEntity(requestBody, 
StandardCharsets.UTF_8));
+
+        HttpResponse response = httpClient.execute(request);
+        String responseBody = EntityUtils.toString(response.getEntity());
+
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new RuntimeException("Failed to create API key: " + 
responseBody);
+        }
+
+        // Parse response to extract API key details
+        try {
+            JsonNode jsonResponse = objectMapper.readTree(responseBody);
+            validApiKeyId = jsonResponse.get("id").asText();
+            validApiKeySecret = jsonResponse.get("api_key").asText();
+            validEncodedApiKey =
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    (validApiKeyId + ":" + validApiKeySecret)
+                                            .getBytes(StandardCharsets.UTF_8));
+
+            log.info(
+                    "API Key created successfully - ID: {}, Secret: {}, 
Encoded: {}",
+                    validApiKeyId,
+                    validApiKeySecret,
+                    validEncodedApiKey);
+
+            // Verify the API key works
+            verifyApiKey();
+
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to parse API key response: " + 
responseBody, e);
+        }
+    }
+
+    /** Verify that the created API key works */
+    private void verifyApiKey() throws IOException {
+        String elasticsearchUrl = "https://"; + 
elasticsearchContainer.getHttpHostAddress();
+        String authUrl = elasticsearchUrl + "/_security/_authenticate";
+
+        HttpGet request = new HttpGet(authUrl);
+        request.setHeader("Authorization", "ApiKey " + validEncodedApiKey);
+
+        HttpResponse response = httpClient.execute(request);
+        String responseBody = EntityUtils.toString(response.getEntity());
+
+        if (response.getStatusLine().getStatusCode() == 200) {
+            log.info("API Key verification successful: {}", responseBody);
+        } else {
+            throw new RuntimeException("API Key verification failed: " + 
responseBody);
+        }
+    }
+
+    private void createTestIndex() throws Exception {
+        String mapping =
+                "{"
+                        + "\"mappings\": {"
+                        + "\"properties\": {"
+                        + "\"id\": {\"type\": \"integer\"},"
+                        + "\"name\": {\"type\": \"text\"},"
+                        + "\"value\": {\"type\": \"double\"}"
+                        + "}"
+                        + "}"
+                        + "}";
+
+        log.info("Creating test index: {}", TEST_INDEX);
+
+        try {
+            esRestClient.createIndex(TEST_INDEX, mapping);
+            log.info("Test index '{}' created successfully", TEST_INDEX);
+        } catch (Exception e) {
+            log.error("Failed to create test index: {}", e.getMessage(), e);
+            throw new RuntimeException("Failed to create test index: " + 
TEST_INDEX, e);
+        }
+    }
+
+    private void insertTestData() throws Exception {
+        StringBuilder requestBody = new StringBuilder();
+        String indexHeader = "{\"index\":{\"_index\":\"" + TEST_INDEX + 
"\"}}\n";
+
+        for (int i = 1; i <= 3; i++) {
+            Map<String, Object> doc = new HashMap<>();
+            doc.put("id", i);
+            doc.put("name", "test_" + i);
+            doc.put("value", i * 10.5);
+
+            requestBody.append(indexHeader);
+            requestBody.append(objectMapper.writeValueAsString(doc));
+            requestBody.append("\n");
+        }
+
+        log.info("Inserting test data into index: {}", TEST_INDEX);
+
+        try {
+            BulkResponse response = esRestClient.bulk(requestBody.toString());
+            if (response.isErrors()) {
+                log.error("Bulk insert had errors: {}", 
response.getResponse());
+                throw new RuntimeException("Failed to insert test data: " + 
response.getResponse());
+            }
+
+            Thread.sleep(INDEX_REFRESH_DELAY);
+            log.info("Test data inserted successfully - {} documents", 3);
+        } catch (Exception e) {
+            log.error("Failed to insert test data", e);
+            throw new RuntimeException("Failed to insert test data", e);
+        }
+    }
+
+    // Helper methods for creating configurations
+    private Map<String, Object> createBasicAuthConfig(String username, String 
password) {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(
+                "hosts",
+                Lists.newArrayList("https://"; + 
elasticsearchContainer.getHttpHostAddress()));
+        configMap.put("username", username);
+        configMap.put("password", password);
+        configMap.put("tls_verify_certificate", false);
+        configMap.put("tls_verify_hostname", false);
+
+        return configMap;
+    }
+
+    private Map<String, Object> createApiKeyConfig(String keyId, String 
keySecret) {
+        Map<String, Object> config = new HashMap<>();
+        config.put(
+                "hosts",
+                Lists.newArrayList("https://"; + 
elasticsearchContainer.getHttpHostAddress()));
+        config.put("auth_type", "api_key");
+        config.put("auth.api_key_id", keyId);
+        config.put("auth.api_key", keySecret);
+        config.put("tls_verify_certificate", false);
+        config.put("tls_verify_hostname", false);
+        return config;
+    }
+
+    private Map<String, Object> createApiKeyEncodedConfig(String encodedKey) {
+        Map<String, Object> config = new HashMap<>();
+        config.put(
+                "hosts",
+                Lists.newArrayList("https://"; + 
elasticsearchContainer.getHttpHostAddress()));
+        config.put("auth_type", "api_key_encoded");
+        config.put("auth.api_key_encoded", encodedKey);
+        config.put("tls_verify_certificate", false);
+        config.put("tls_verify_hostname", false);
+        return config;
+    }
+
+    // ==================== Basic Authentication Tests ====================
+
+    /** Test successful basic authentication with valid credentials */
+    @Test
+    public void testBasicAuthenticationSuccess() throws Exception {
+        log.info("=== Testing Basic Authentication Success ===");
+
+        Map<String, Object> config = createBasicAuthConfig(VALID_USERNAME, 
VALID_PASSWORD);
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+        // Test provider creation
+        AuthenticationProvider provider =
+                AuthenticationProviderFactory.createProvider(readonlyConfig);
+        Assertions.assertNotNull(provider, "Authentication provider should be 
created");
+        Assertions.assertEquals(
+                "basic", provider.getAuthType(), "Provider should be basic 
auth type");
+
+        // Test client creation and functionality
+        try (EsRestClient client = 
EsRestClient.createInstance(readonlyConfig)) {
+            Assertions.assertNotNull(client, "EsRestClient should be created 
successfully");
+
+            // Verify client can perform operations
+            long docCount = 
client.getIndexDocsCount(TEST_INDEX).get(0).getDocsCount();
+            Assertions.assertTrue(
+                    docCount > 0, "Should be able to query index with valid 
credentials");
+
+            log.info("✓ Basic authentication success test passed - {} 
documents found", docCount);
+        }
+    }
+
+    /** Test basic authentication failure with invalid credentials */
+    @Test
+    public void testBasicAuthenticationFailure() throws Exception {
+        log.info("=== Testing Basic Authentication Failure ===");
+
+        Map<String, Object> config = createBasicAuthConfig(INVALID_USERNAME, 
INVALID_PASSWORD);
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+        // Test provider creation (should succeed)
+        AuthenticationProvider provider =
+                AuthenticationProviderFactory.createProvider(readonlyConfig);
+        Assertions.assertNotNull(
+                provider,
+                "Authentication provider should be created even with invalid 
credentials");
+        Assertions.assertEquals(
+                "basic", provider.getAuthType(), "Provider should be basic 
auth type");
+
+        // Test client creation (should succeed)
+        try (EsRestClient client = 
EsRestClient.createInstance(readonlyConfig)) {
+            Assertions.assertNotNull(client, "EsRestClient should be created");
+
+            // Test operation (should fail with authentication error)
+            Exception exception =
+                    Assertions.assertThrows(
+                            Exception.class,
+                            () -> {
+                                client.getIndexDocsCount(TEST_INDEX);
+                            },
+                            "Should throw exception when using invalid 
credentials");
+
+            log.info(
+                    "✓ Basic authentication failure test passed - exception: 
{}",
+                    exception.getMessage());
+        }
+    }
+
+    // ==================== API Key Authentication Tests ====================
+
+    /** Test successful API key authentication with valid key */
+    @Test
+    public void testApiKeyAuthenticationSuccess() throws Exception {
+        log.info("=== Testing API Key Authentication Success ===");
+
+        Map<String, Object> config = createApiKeyConfig(validApiKeyId, 
validApiKeySecret);
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+        // Test provider creation
+        AuthenticationProvider provider =
+                AuthenticationProviderFactory.createProvider(readonlyConfig);
+        Assertions.assertNotNull(provider, "Authentication provider should be 
created");
+        Assertions.assertEquals(
+                "api_key", provider.getAuthType(), "Provider should be api_key 
auth type");
+
+        // Test client creation and functionality
+        try (EsRestClient client = 
EsRestClient.createInstance(readonlyConfig)) {
+            Assertions.assertNotNull(client, "EsRestClient should be created 
successfully");
+
+            // Verify client can perform operations with real API key
+            long docCount = 
client.getIndexDocsCount(TEST_INDEX).get(0).getDocsCount();
+            Assertions.assertTrue(docCount > 0, "Should be able to query index 
with valid API key");
+
+            log.info("✓ API key authentication success test passed - {} 
documents found", docCount);
+        }
+    }
+
+    /** Test API key authentication failure with invalid key */
+    @Test
+    public void testApiKeyAuthenticationFailure() throws Exception {
+        log.info("=== Testing API Key Authentication Failure ===");
+
+        Map<String, Object> config = createApiKeyConfig(INVALID_API_KEY_ID, 
INVALID_API_KEY_SECRET);
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+        // Test provider creation (should succeed)
+        AuthenticationProvider provider =
+                AuthenticationProviderFactory.createProvider(readonlyConfig);
+        Assertions.assertNotNull(provider, "Authentication provider should be 
created");
+        Assertions.assertEquals(
+                "api_key", provider.getAuthType(), "Provider should be api_key 
auth type");
+
+        // Test client creation (should succeed)
+        try (EsRestClient client = 
EsRestClient.createInstance(readonlyConfig)) {
+            Assertions.assertNotNull(client, "EsRestClient should be created");
+
+            // Test operation (should fail with authentication error)
+            Exception exception =
+                    Assertions.assertThrows(
+                            Exception.class,
+                            () -> {
+                                client.getIndexDocsCount(TEST_INDEX);
+                            },
+                            "Should throw exception when using invalid API 
key");
+
+            log.info(
+                    "✓ API key authentication failure test passed - exception: 
{}",
+                    exception.getMessage());
+        }
+    }
+
+    /** Test API key authentication with encoded format */
+    @Test
+    public void testApiKeyEncodedAuthentication() throws Exception {
+        log.info("=== Testing API Key Encoded Authentication ===");
+
+        Map<String, Object> config = 
createApiKeyEncodedConfig(validEncodedApiKey);
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
+
+        // Test provider creation
+        AuthenticationProvider provider =
+                AuthenticationProviderFactory.createProvider(readonlyConfig);
+        Assertions.assertNotNull(provider, "Authentication provider should be 
created");
+        Assertions.assertEquals(
+                "api_key_encoded",
+                provider.getAuthType(),
+                "Provider should be api_key_encoded auth type");
+
+        // Test client creation and functionality
+        try (EsRestClient client = 
EsRestClient.createInstance(readonlyConfig)) {
+            Assertions.assertNotNull(client, "EsRestClient should be created 
successfully");
+
+            // Verify client can perform operations with encoded API key
+            long docCount = 
client.getIndexDocsCount(TEST_INDEX).get(0).getDocsCount();
+            Assertions.assertTrue(
+                    docCount > 0, "Should be able to query index with valid 
encoded API key");
+
+            log.info("✓ API key encoded authentication test passed - {} 
documents found", docCount);
+        }
+    }
+
+    /** E2E test: API Key authentication source and sink */
+    @TestTemplate
+    public void testE2EApiKeyAuthSourceAndSink(TestContainer container) throws 
Exception {
+        log.info("=== E2E Test: API Key Authentication Source and Sink ===");
+
+        // Setup test data
+        setupAuthTestData();
+
+        // Create temporary config file with real API key values in resources 
directory
+        String configContent = createApiKeyConfigContent();
+        java.io.File resourcesDir = new 
java.io.File("src/test/resources/elasticsearch");
+        if (!resourcesDir.exists()) {
+            resourcesDir.mkdirs();
+        }
+
+        java.io.File tempConfigFile =
+                new java.io.File(resourcesDir, 
"elasticsearch_auth_apikey_temp.conf");
+        try (java.io.FileWriter writer = new 
java.io.FileWriter(tempConfigFile)) {
+            writer.write(configContent);
+        }
+
+        try {
+            // Execute SeaTunnel job with API key auth using relative path
+            Container.ExecResult execResult =
+                    
container.executeJob("/elasticsearch/elasticsearch_auth_apikey_temp.conf");
+            Assertions.assertEquals(
+                    0, execResult.getExitCode(), "Job should complete 
successfully");
+
+            // Wait for index refresh
+            Thread.sleep(2000);
+
+            // Verify results
+            long targetCount =
+                    
esRestClient.getIndexDocsCount("auth_test_apikey_target").get(0).getDocsCount();
+            log.info("✓ API Key auth E2E test completed - {} documents 
processed", targetCount);
+            Assertions.assertTrue(
+                    targetCount > 0, "Should have processed documents with API 
key auth");
+
+        } finally {
+            // Clean up temporary file
+            if (tempConfigFile.exists()) {
+                tempConfigFile.delete();
+            }
+        }
+    }
+
+    /** E2E test: API Key Encoded authentication source and sink */
+    @TestTemplate
+    public void testE2EApiKeyEncodedAuthSourceAndSink(TestContainer container) 
throws Exception {
+        log.info("=== E2E Test: API Key Encoded Authentication Source and Sink 
===");
+
+        // Setup test data
+        setupAuthTestData();
+
+        // Create temporary config file with real encoded API key values
+        String configContent = createApiKeyEncodedConfigContent();
+        java.io.File resourcesDir = new 
java.io.File("src/test/resources/elasticsearch");
+        if (!resourcesDir.exists()) {
+            resourcesDir.mkdirs();
+        }
+
+        java.io.File tempConfigFile =
+                new java.io.File(resourcesDir, 
"elasticsearch_auth_apikey_encoded_temp.conf");
+        try (java.io.FileWriter writer = new 
java.io.FileWriter(tempConfigFile)) {
+            writer.write(configContent);
+        }
+
+        try {
+            // Execute SeaTunnel job with encoded API key auth
+            Container.ExecResult execResult =
+                    container.executeJob(
+                            
"/elasticsearch/elasticsearch_auth_apikey_encoded_temp.conf");
+            Assertions.assertEquals(
+                    0, execResult.getExitCode(), "Job should complete 
successfully");
+
+            // Wait for index refresh
+            Thread.sleep(2000);
+
+            // Verify results
+            long targetCount =
+                    esRestClient
+                            
.getIndexDocsCount("auth_test_apikey_encoded_target")
+                            .get(0)
+                            .getDocsCount();
+            log.info(
+                    "✓ API Key Encoded auth E2E test completed - {} documents 
processed",
+                    targetCount);
+            Assertions.assertTrue(
+                    targetCount > 0, "Should have processed documents with 
encoded API key auth");
+
+        } finally {
+            // Clean up temporary file
+            if (tempConfigFile.exists()) {
+                tempConfigFile.delete();
+            }
+        }
+    }
+
+    /** Create API Key configuration content with real values */
+    private String createApiKeyConfigContent() {
+        return String.format(
+                "env {\n"
+                        + "  parallelism = 1\n"
+                        + "  job.mode = \"BATCH\"\n"
+                        + "}\n"
+                        + "\n"
+                        + "source {\n"
+                        + "  Elasticsearch {\n"
+                        + "    hosts = [\"https://elasticsearch:9200\"]\n";
+                        + "    auth_type = \"api_key\"\n"
+                        + "    auth.api_key_id = \"%s\"\n"
+                        + "    auth.api_key = \"%s\"\n"
+                        + "    tls_verify_certificate = false\n"
+                        + "    tls_verify_hostname = false\n"
+                        + "\n"
+                        + "    index = \"auth_test_index\"\n"
+                        + "    query = {\"match_all\": {}}\n"
+                        + "    schema = {\n"
+                        + "      fields {\n"
+                        + "        id = int\n"
+                        + "        name = string\n"
+                        + "        category = string\n"
+                        + "        price = double\n"
+                        + "        timestamp = timestamp\n"
+                        + "      }\n"
+                        + "    }\n"
+                        + "  }\n"
+                        + "}\n"
+                        + "\n"
+                        + "sink {\n"
+                        + "  Elasticsearch {\n"
+                        + "    hosts = [\"https://elasticsearch:9200\"]\n";
+                        + "    auth_type = \"api_key\"\n"
+                        + "    auth.api_key_id = \"%s\"\n"
+                        + "    auth.api_key = \"%s\"\n"
+                        + "    tls_verify_certificate = false\n"
+                        + "    tls_verify_hostname = false\n"
+                        + "\n"
+                        + "    index = \"auth_test_apikey_target\"\n"
+                        + "    schema_save_mode = 
\"CREATE_SCHEMA_WHEN_NOT_EXIST\"\n"
+                        + "    data_save_mode = \"APPEND_DATA\"\n"
+                        + "  }\n"
+                        + "}\n",
+                validApiKeyId, validApiKeySecret, validApiKeyId, 
validApiKeySecret);
+    }
+
+    /** Create API Key Encoded configuration content with real values */
+    private String createApiKeyEncodedConfigContent() {
+        return String.format(
+                "env {\n"
+                        + "  parallelism = 1\n"
+                        + "  job.mode = \"BATCH\"\n"
+                        + "}\n"
+                        + "\n"
+                        + "source {\n"
+                        + "  Elasticsearch {\n"
+                        + "    hosts = [\"https://elasticsearch:9200\"]\n";
+                        + "    auth_type = \"api_key_encoded\"\n"
+                        + "    auth.api_key_encoded = \"%s\"\n"
+                        + "    tls_verify_certificate = false\n"
+                        + "    tls_verify_hostname = false\n"
+                        + "\n"
+                        + "    index = \"auth_test_index\"\n"
+                        + "    query = {\"match_all\": {}}\n"
+                        + "    schema = {\n"
+                        + "      fields {\n"
+                        + "        id = int\n"
+                        + "        name = string\n"
+                        + "        category = string\n"
+                        + "        price = double\n"
+                        + "        timestamp = timestamp\n"
+                        + "      }\n"
+                        + "    }\n"
+                        + "  }\n"
+                        + "}\n"
+                        + "\n"
+                        + "sink {\n"
+                        + "  Elasticsearch {\n"
+                        + "    hosts = [\"https://elasticsearch:9200\"]\n";
+                        + "    auth_type = \"api_key_encoded\"\n"
+                        + "    auth.api_key_encoded = \"%s\"\n"
+                        + "    tls_verify_certificate = false\n"
+                        + "    tls_verify_hostname = false\n"
+                        + "\n"
+                        + "    index = \"auth_test_apikey_encoded_target\"\n"
+                        + "    schema_save_mode = 
\"CREATE_SCHEMA_WHEN_NOT_EXIST\"\n"
+                        + "    data_save_mode = \"APPEND_DATA\"\n"
+                        + "  }\n"
+                        + "}\n",
+                validEncodedApiKey, validEncodedApiKey);
+    }
+
+    /** Setup test data for authentication tests */
+    private void setupAuthTestData() throws Exception {
+        String testIndex = "auth_test_index";
+
+        // Create index mapping
+        String mapping =
+                "{"
+                        + "\"mappings\": {"
+                        + "\"properties\": {"
+                        + "\"id\": {\"type\": \"integer\"},"
+                        + "\"name\": {\"type\": \"text\"},"
+                        + "\"category\": {\"type\": \"keyword\"},"
+                        + "\"price\": {\"type\": \"double\"},"
+                        + "\"timestamp\": {\"type\": \"date\"}"
+                        + "}"
+                        + "}"
+                        + "}";
+
+        try {
+            esRestClient.createIndex(testIndex, mapping);
+            log.info("Created test index: {}", testIndex);
+        } catch (Exception e) {
+            log.warn("Index might already exist: {}", e.getMessage());
+        }
+
+        // Insert test data
+        StringBuilder requestBody = new StringBuilder();
+        String indexHeader = "{\"index\":{\"_index\":\"" + testIndex + 
"\"}}\n";
+
+        String[] categories = {"electronics", "books", "clothing", "home", 
"sports"};
+        for (int i = 1; i <= 10; i++) {
+            Map<String, Object> doc = new HashMap<>();
+            doc.put("id", i);
+            doc.put("name", "Auth Test Product " + i);
+            doc.put("category", categories[i % categories.length]);
+            doc.put("price", 15.99 + (i * 3.5)); // Prices from 19.49 to 50.49
+            doc.put("timestamp", "2024-01-" + String.format("%02d", i) + 
"T10:00:00Z");
+
+            requestBody.append(indexHeader);
+            requestBody.append(objectMapper.writeValueAsString(doc));
+            requestBody.append("\n");
+        }
+
+        BulkResponse response = esRestClient.bulk(requestBody.toString());
+        if (response.isErrors()) {
+            log.warn("Some documents might already exist: {}", 
response.getResponse());
+        }
+
+        // Wait for index refresh
+        Thread.sleep(2000);
+
+        long docCount = 
esRestClient.getIndexDocsCount(testIndex).get(0).getDocsCount();
+        log.info("Test data setup completed - {} documents in source index", 
docCount);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 1831496e5b..2479be67b0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -76,7 +76,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -118,17 +117,16 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                                         
DockerLoggerFactory.getLogger("elasticsearch:8.9.0")));
         Startables.deepStart(Stream.of(container)).join();
         log.info("Elasticsearch container started");
-        esRestClient =
-                EsRestClient.createInstance(
-                        Lists.newArrayList("https://"; + 
container.getHttpHostAddress()),
-                        Optional.of("elastic"),
-                        Optional.of("elasticsearch"),
-                        false,
-                        false,
-                        Optional.empty(),
-                        Optional.empty(),
-                        Optional.empty(),
-                        Optional.empty());
+        // Create configuration for EsRestClient
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("hosts", Lists.newArrayList("https://"; + 
container.getHttpHostAddress()));
+        configMap.put("username", "elastic");
+        configMap.put("password", "elasticsearch");
+        configMap.put("tls_verify_certificate", false);
+        configMap.put("tls_verify_hostname", false);
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        esRestClient = EsRestClient.createInstance(config);
         testDataset1 = generateTestDataSet1();
         testDataset2 = generateTestDataSet2();
         createIndexForResourceNull("st_index");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
index d09954fcc1..c4608361dd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
@@ -49,8 +50,9 @@ import org.testcontainers.utility.DockerLoggerFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -97,17 +99,16 @@ public class ElasticsearchSchemaChangeIT extends 
TestSuiteBase implements TestRe
                                         
DockerLoggerFactory.getLogger("elasticsearch:8.9.0")));
         Startables.deepStart(Stream.of(container)).join();
         log.info("Elasticsearch container started");
-        esRestClient =
-                EsRestClient.createInstance(
-                        Lists.newArrayList("https://"; + 
container.getHttpHostAddress()),
-                        Optional.of("elastic"),
-                        Optional.of("elasticsearch"),
-                        false,
-                        false,
-                        Optional.empty(),
-                        Optional.empty(),
-                        Optional.empty(),
-                        Optional.empty());
+        // Create configuration for EsRestClient
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("hosts", Lists.newArrayList("https://"; + 
container.getHttpHostAddress()));
+        configMap.put("username", "elastic");
+        configMap.put("password", "elasticsearch");
+        configMap.put("tls_verify_certificate", false);
+        configMap.put("tls_verify_hostname", false);
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        esRestClient = EsRestClient.createInstance(config);
 
         Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
         shopDatabase.createAndInitialize();

Reply via email to