This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d487362d5 [INLONG-10644][Sort] Add the elasticsearch-base module to 
implement elasticsearch connector 6&7 (#10647)
2d487362d5 is described below

commit 2d487362d5425e7e0d94473257827f44525bf3db
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Thu Jul 25 14:19:28 2024 +0800

    [INLONG-10644][Sort] Add the elasticsearch-base module to implement 
elasticsearch connector 6&7 (#10647)
---
 .../sort-connectors/elasticsearch-base/pom.xml     | 140 ++++++
 .../elasticsearch/ActionRequestFailureHandler.java |  79 ++++
 .../elasticsearch/BufferingNoOpRequestIndexer.java |  73 +++
 .../elasticsearch/ElasticsearchApiCallBridge.java  | 118 +++++
 .../sort/elasticsearch/ElasticsearchSinkBase.java  | 515 +++++++++++++++++++++
 .../elasticsearch/ElasticsearchSinkFunction.java   |  89 ++++
 .../inlong/sort/elasticsearch/RequestIndexer.java  |  79 ++++
 .../table/AbstractTimeIndexGenerator.java          |  40 ++
 .../table/ElasticsearchConfiguration.java          | 162 +++++++
 .../table/ElasticsearchConnectorOptions.java       | 169 +++++++
 .../table/ElasticsearchValidationUtils.java        |  94 ++++
 .../sort/elasticsearch/table/IndexGenerator.java   |  39 ++
 .../elasticsearch/table/IndexGeneratorBase.java    |  51 ++
 .../elasticsearch/table/IndexGeneratorFactory.java | 319 +++++++++++++
 .../sort/elasticsearch/table/KeyExtractor.java     | 129 ++++++
 .../sort/elasticsearch/table/RequestFactory.java   |  54 +++
 .../table/RowElasticsearchSinkFunction.java        | 142 ++++++
 .../elasticsearch/table/StaticIndexGenerator.java  |  34 ++
 .../elasticsearch/util/IgnoringFailureHandler.java |  37 ++
 .../elasticsearch/util/NoOpFailureHandler.java     |  54 +++
 .../util/RetryRejectedExecutionFailureHandler.java |  56 +++
 .../sort-flink-v1.18/sort-connectors/pom.xml       |   1 +
 licenses/inlong-sort-connectors/LICENSE            |  21 +
 23 files changed, 2495 insertions(+)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml
new file mode 100644
index 0000000000..709eefdb06
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors-v1.18</artifactId>
+        <version>1.14.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>sort-connector-elasticsearch-base-v1.18</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-elasticsearch-base</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+        <elasticsearch.version>7.10.2</elasticsearch.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-flink-dependencies-v1.18</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${elasticsearch.version}</version>
+            <!--
+            FLINK-7133: Excluding all org.ow2.asm from elasticsearch 
dependencies because
+            1. from the POV of client they are optional,
+            2. the version configured by default at the time of writing this 
comment (1.7.1) depends on asm 4.1
+               and when it is shaded into elasticsearch-base artifact it 
conflicts with newer shaded versions of asm
+               resulting in errors at the runtime when application is executed 
locally, e.g. from IDE.
+            -->
+            <exclusions>
+                <exclusion>
+                    <groupId>org.ow2.asm</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>${elasticsearch.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <!--
+            Including Log4j2 dependencies for tests is required for the
+            embedded Elasticsearch nodes used in tests to run correctly.
+        -->
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <!-- Shade all the dependencies to avoid conflicts -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                    </includes>
+                                </filter>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>log4j.properties</exclude>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"
 />
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java
new file mode 100644
index 0000000000..e014feb86a
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * An implementation of {@link ActionRequestFailureHandler} is provided by the 
user to define how
+ * failed {@link ActionRequest ActionRequests} should be handled, e.g. 
dropping them, reprocessing
+ * malformed documents, or simply requesting them to be sent to Elasticsearch 
again if the failure
+ * is only temporary.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * private static class ExampleActionRequestFailureHandler implements 
ActionRequestFailureHandler {
+ *
+ *     @Override
+ *     void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+ *             if (ExceptionUtils.findThrowable(failure, 
EsRejectedExecutionException.class).isPresent()) {
+ *                     // full queue; re-add document for indexing
+ *                     indexer.add(action);
+ *             } else if (ExceptionUtils.findThrowable(failure, 
ElasticsearchParseException.class).isPresent()) {
+ *                     // malformed document; simply drop request without 
failing sink
+ *             } else {
+ *                     // for all other failures, fail the sink;
+ *                     // here the failure is simply rethrown, but users can 
also choose to throw custom exceptions
+ *                     throw failure;
+ *             }
+ *     }
+ * }
+ *
+ * }</pre>
+ *
+ * <p>The above example will let the sink re-add requests that failed due to 
queue capacity
+ * saturation and drop requests with malformed documents, without failing the 
sink. For all other
+ * failures, the sink will fail.
+ *
+ * <p>Note: For Elasticsearch 1.x, it is not feasible to match the type of the 
failure because the
+ * exact type could not be retrieved through the older version Java client 
APIs (thus, the types
+ * will be general {@link Exception}s and only differ in the failure message). 
In this case, it is
+ * recommended to match on the provided REST status code.
+ *
+ */
+@PublicEvolving
+public interface ActionRequestFailureHandler extends Serializable {
+
+    /**
+     * Handle a failed {@link ActionRequest}.
+     *
+     * @param action the {@link ActionRequest} that failed due to the failure
+     * @param failure the cause of failure
+     * @param restStatusCode the REST status code of the failure (-1 if none 
can be retrieved)
+     * @param indexer request indexer to re-add the failed action, if intended 
to do so
+     * @throws Throwable if the sink should fail on this failure, the 
implementation should rethrow
+     *     the exception or a custom one
+     */
+    void onFailure(
+            ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer)
+            throws Throwable;
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
new file mode 100644
index 0000000000..00ceba4ff4
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Implementation of a {@link RequestIndexer} that buffers {@link 
ActionRequest ActionRequests}
+ * before re-sending them to the Elasticsearch cluster upon request.
+ */
+@Internal
+@NotThreadSafe
+class BufferingNoOpRequestIndexer implements RequestIndexer {
+
+    private ConcurrentLinkedQueue<ActionRequest> bufferedRequests;
+
+    BufferingNoOpRequestIndexer() {
+        this.bufferedRequests = new ConcurrentLinkedQueue<ActionRequest>();
+    }
+
+    @Override
+    public void add(DeleteRequest... deleteRequests) {
+        Collections.addAll(bufferedRequests, deleteRequests);
+    }
+
+    @Override
+    public void add(IndexRequest... indexRequests) {
+        Collections.addAll(bufferedRequests, indexRequests);
+    }
+
+    @Override
+    public void add(UpdateRequest... updateRequests) {
+        Collections.addAll(bufferedRequests, updateRequests);
+    }
+
+    void processBufferedRequests(RequestIndexer actualIndexer) {
+        for (ActionRequest request : bufferedRequests) {
+            if (request instanceof IndexRequest) {
+                actualIndexer.add((IndexRequest) request);
+            } else if (request instanceof DeleteRequest) {
+                actualIndexer.add((DeleteRequest) request);
+            } else if (request instanceof UpdateRequest) {
+                actualIndexer.add((UpdateRequest) request);
+            }
+        }
+
+        bufferedRequests.clear();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
new file mode 100644
index 0000000000..88742cb498
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible 
Elasticsearch Java API calls
+ * across different versions. This includes calls to create Elasticsearch 
clients, handle failed
+ * item responses, etc. Any incompatible Elasticsearch Java APIs should be 
bridged using this
+ * interface.
+ *
+ * <p>Implementations are allowed to be stateful. For example, for 
Elasticsearch 1.x, since
+ * connecting via an embedded node is allowed, the call bridge will hold 
reference to the created
+ * embedded node. Each instance of the sink will hold exactly one instance of 
the call bridge, and
+ * state cleanup is performed when the sink is closed.
+ *
+ * @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
+ */
+@Internal
+public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends 
Serializable {
+
+    /**
+     * Creates an Elasticsearch client implementing {@link AutoCloseable}.
+     *
+     * @return The created client.
+     */
+    C createClient();
+
+    /**
+     * Creates a {@link BulkProcessor.Builder} for creating the bulk processor.
+     *
+     * @param client the Elasticsearch client.
+     * @param listener the bulk processor listener.
+     * @return the bulk processor builder.
+     */
+    BulkProcessor.Builder createBulkProcessorBuilder(C client, 
BulkProcessor.Listener listener);
+
+    /**
+     * Extracts the cause of failure of a bulk item action.
+     *
+     * @param bulkItemResponse the bulk item response to extract cause of 
failure
+     * @return the extracted {@link Throwable} from the response ({@code null} 
is the response is
+     *     successful).
+     */
+    @Nullable
+    Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse 
bulkItemResponse);
+
+    /**
+     * Sets the bulk flush interval, in milliseconds on the provided {@link 
BulkProcessor.Builder}.
+     * The builder will be later on used to instantiate the actual {@link 
BulkProcessor}.
+     *
+     * @param builder the {@link BulkProcessor.Builder} to configure.
+     * @param flushIntervalMillis the flush interval in milliseconds.
+     */
+    void configureBulkProcessorFlushInterval(
+            BulkProcessor.Builder builder, long flushIntervalMillis);
+
+    /**
+     * Set backoff-related configurations on the provided {@link 
BulkProcessor.Builder}. The builder
+     * will be later on used to instantiate the actual {@link BulkProcessor}.
+     *
+     * @param builder the {@link BulkProcessor.Builder} to configure.
+     * @param flushBackoffPolicy user-provided backoff retry settings ({@code 
null} if the user
+     *     disabled backoff retries).
+     */
+    void configureBulkProcessorBackoff(
+            BulkProcessor.Builder builder,
+            @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy);
+
+    /**
+     * Verify the client connection by making a test request/ping to the 
Elasticsearch cluster.
+     *
+     * <p>Called by {@link 
ElasticsearchSinkBase#open(org.apache.flink.configuration.Configuration)}
+     * after creating the client. This makes sure the underlying client is 
closed if the connection
+     * is not successful and preventing thread leak.
+     *
+     * @param client the Elasticsearch client.
+     */
+    void verifyClientConnection(C client) throws IOException;
+
+    /**
+     * Creates a {@link RequestIndexer} that is able to work with {@link 
BulkProcessor} binary
+     * compatible.
+     */
+    RequestIndexer createBulkProcessorIndexer(
+            BulkProcessor bulkProcessor,
+            boolean flushOnCheckpoint,
+            AtomicLong numPendingRequestsRef);
+
+    /** Perform any necessary state cleanup. */
+    default void cleanup() {
+        // nothing to cleanup by default
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
new file mode 100644
index 0000000000..4375429332
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * <p>This class implements the common behaviour across Elasticsearch 
versions, such as the use of
+ * an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s 
before sending the
+ * requests to the cluster, as well as passing input records to the user 
provided {@link
+ * ElasticsearchSinkFunction} for processing.
+ *
+ * <p>The version specific API calls for different Elasticsearch versions 
should be defined by a
+ * concrete implementation of a {@link ElasticsearchApiCallBridge}, which is 
provided to the
+ * constructor of this class. This call bridge is used, for example, to create 
a Elasticsearch
+ * {@link Client}, handle failed item responses, etc.
+ *
+ * @param <T> Type of the elements handled by this sink
+ * @param <C> Type of the Elasticsearch client, which implements {@link 
AutoCloseable}
+ */
+@Internal
+public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> 
extends RichSinkFunction<T>
+        implements
+            CheckpointedFunction {
+
+    private static final long serialVersionUID = -1007596293618451942L;
+
+    // ------------------------------------------------------------------------
+    // Internal bulk processor configuration
+    // ------------------------------------------------------------------------
+
+    public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
+    public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
+    public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
+    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = 
"bulk.flush.backoff.enable";
+    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = 
"bulk.flush.backoff.type";
+    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = 
"bulk.flush.backoff.retries";
+    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = 
"bulk.flush.backoff.delay";
+
+    /** Used to control whether the retry delay should increase exponentially 
or remain constant. */
+    @PublicEvolving
+    public enum FlushBackoffType {
+        CONSTANT,
+        EXPONENTIAL
+    }
+
+    /**
+     * Provides a backoff policy for bulk requests. Whenever a bulk request is 
rejected due to
+     * resource constraints (i.e. the client's internal thread pool is full), 
the backoff policy
+     * decides how long the bulk processor will wait before the operation is 
retried internally.
+     *
+     * <p>This is a proxy for version specific backoff policies.
+     */
+    public static class BulkFlushBackoffPolicy implements Serializable {
+
+        private static final long serialVersionUID = -6022851996101826049L;
+
+        // the default values follow the Elasticsearch default settings for 
BulkProcessor
+        private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
+        private int maxRetryCount = 8;
+        private long delayMillis = 50;
+
+        public FlushBackoffType getBackoffType() {
+            return backoffType;
+        }
+
+        public int getMaxRetryCount() {
+            return maxRetryCount;
+        }
+
+        public long getDelayMillis() {
+            return delayMillis;
+        }
+
+        public void setBackoffType(FlushBackoffType backoffType) {
+            this.backoffType = checkNotNull(backoffType);
+        }
+
+        public void setMaxRetryCount(int maxRetryCount) {
+            checkArgument(maxRetryCount >= 0);
+            this.maxRetryCount = maxRetryCount;
+        }
+
+        public void setDelayMillis(long delayMillis) {
+            checkArgument(delayMillis >= 0);
+            this.delayMillis = delayMillis;
+        }
+    }
+
+    private final Integer bulkProcessorFlushMaxActions;
+    private final Integer bulkProcessorFlushMaxSizeMb;
+    private final Long bulkProcessorFlushIntervalMillis;
+    private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
+
+    // ------------------------------------------------------------------------
+    // User-facing API and configuration
+    // ------------------------------------------------------------------------
+
+    /**
+     * The function that is used to construct multiple {@link ActionRequest 
ActionRequests} from
+     * each incoming element.
+     */
+    private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+    /** User-provided handler for failed {@link ActionRequest ActionRequests}. 
*/
+    private final ActionRequestFailureHandler failureHandler;
+
+    /**
+     * If true, the producer will wait until all outstanding action requests 
have been sent to
+     * Elasticsearch.
+     */
+    private boolean flushOnCheckpoint = true;
+
+    /**
+     * Provided to the user via the {@link ElasticsearchSinkFunction} to add 
{@link ActionRequest
+     * ActionRequests}.
+     */
+    private transient RequestIndexer requestIndexer;
+
+    /**
+     * Provided to the {@link ActionRequestFailureHandler} to allow users to 
re-index failed
+     * requests.
+     */
+    private transient BufferingNoOpRequestIndexer failureRequestIndexer;
+
+    // ------------------------------------------------------------------------
+    // Internals for the Flink Elasticsearch Sink
+    // ------------------------------------------------------------------------
+
+    /** Call bridge for different version-specific. */
+    private final ElasticsearchApiCallBridge<C> callBridge;
+
+    /**
+     * Number of pending action requests not yet acknowledged by 
Elasticsearch. This value is
+     * maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is 
{@code true}.
+     *
+     * <p>This is incremented whenever the user adds (or re-adds through the 
{@link
+     * ActionRequestFailureHandler}) requests to the {@link RequestIndexer}. 
It is decremented for
+     * each completed request of a bulk request, in {@link 
BulkProcessor.Listener#afterBulk(long,
+     * BulkRequest, BulkResponse)} and {@link 
BulkProcessor.Listener#afterBulk(long, BulkRequest,
+     * Throwable)}.
+     */
+    private AtomicLong numPendingRequests = new AtomicLong(0);
+
+    /** Elasticsearch client created using the call bridge. */
+    private transient C client;
+
+    /** Bulk processor to buffer and send requests to Elasticsearch, created 
using the client. */
+    private transient BulkProcessor bulkProcessor;
+
+    /**
+     * This is set from inside the {@link BulkProcessor.Listener} if a {@link 
Throwable} was thrown
+     * in callbacks and the user considered it should fail the sink via the 
{@link
+     * ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, 
RequestIndexer)} method.
+     *
+     * <p>Errors will be checked and rethrown before processing each input 
element, and when the
+     * sink is closed.
+     */
+    private final AtomicReference<Throwable> failureThrowable = new 
AtomicReference<>();
+
+    public ElasticsearchSinkBase(
+            ElasticsearchApiCallBridge<C> callBridge,
+            Map<String, String> userConfig,
+            ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+            ActionRequestFailureHandler failureHandler) {
+
+        this.callBridge = checkNotNull(callBridge);
+        this.elasticsearchSinkFunction = 
checkNotNull(elasticsearchSinkFunction);
+        this.failureHandler = checkNotNull(failureHandler);
+        // we eagerly check if the user-provided sink function and failure 
handler is serializable;
+        // otherwise, if they aren't serializable, users will merely get a 
non-informative error
+        // message
+        // "ElasticsearchSinkBase is not serializable"
+
+        checkArgument(
+                InstantiationUtil.isSerializable(elasticsearchSinkFunction),
+                "The implementation of the provided ElasticsearchSinkFunction 
is not serializable. "
+                        + "The object probably contains or references 
non-serializable fields.");
+
+        checkArgument(
+                InstantiationUtil.isSerializable(failureHandler),
+                "The implementation of the provided 
ActionRequestFailureHandler is not serializable. "
+                        + "The object probably contains or references 
non-serializable fields.");
+
+        // extract and remove bulk processor related configuration from the 
user-provided config,
+        // so that the resulting user config only contains configuration 
related to the
+        // Elasticsearch client.
+
+        checkNotNull(userConfig);
+
+        // copy config so we can remove entries without side-effects
+        userConfig = new HashMap<>(userConfig);
+
+        ParameterTool params = ParameterTool.fromMap(userConfig);
+
+        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+            bulkProcessorFlushMaxActions = 
params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+            userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+        } else {
+            bulkProcessorFlushMaxActions = null;
+        }
+
+        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+            bulkProcessorFlushMaxSizeMb = 
params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+            userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+        } else {
+            bulkProcessorFlushMaxSizeMb = null;
+        }
+
+        if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+            bulkProcessorFlushIntervalMillis = 
params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+            userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+        } else {
+            bulkProcessorFlushIntervalMillis = null;
+        }
+
+        boolean bulkProcessorFlushBackoffEnable =
+                params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
+        userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
+
+        if (bulkProcessorFlushBackoffEnable) {
+            this.bulkProcessorFlushBackoffPolicy = new 
BulkFlushBackoffPolicy();
+
+            if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
+                bulkProcessorFlushBackoffPolicy.setBackoffType(
+                        
FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
+                userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
+            }
+
+            if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
+                bulkProcessorFlushBackoffPolicy.setMaxRetryCount(
+                        params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
+                userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
+            }
+
+            if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
+                bulkProcessorFlushBackoffPolicy.setDelayMillis(
+                        params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
+                userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
+            }
+
+        } else {
+            bulkProcessorFlushBackoffPolicy = null;
+        }
+    }
+
+    /**
+     * Disable flushing on checkpoint. When disabled, the sink will not wait 
for all pending action
+     * requests to be acknowledged by Elasticsearch on checkpoints.
+     *
+     * <p>NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch 
Sink does NOT provide
+     * any strong guarantees for at-least-once delivery of action requests.
+     */
+    public void disableFlushOnCheckpoint() {
+        this.flushOnCheckpoint = false;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        client = callBridge.createClient();
+        callBridge.verifyClientConnection(client);
+        bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
+        requestIndexer =
+                callBridge.createBulkProcessorIndexer(
+                        bulkProcessor, flushOnCheckpoint, numPendingRequests);
+        failureRequestIndexer = new BufferingNoOpRequestIndexer();
+        elasticsearchSinkFunction.open(getRuntimeContext());
+    }
+
+    @Override
+    public void invoke(T value, Context context) throws Exception {
+        checkAsyncErrorsAndRequests();
+        elasticsearchSinkFunction.process(value, getRuntimeContext(), 
requestIndexer);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        // no initialization needed
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        checkAsyncErrorsAndRequests();
+
+        if (flushOnCheckpoint) {
+            while (numPendingRequests.get() != 0) {
+                bulkProcessor.flush();
+                checkAsyncErrorsAndRequests();
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        elasticsearchSinkFunction.close();
+        if (bulkProcessor != null) {
+            bulkProcessor.close();
+            bulkProcessor = null;
+        }
+
+        if (client != null) {
+            client.close();
+            client = null;
+        }
+
+        callBridge.cleanup();
+
+        // make sure any errors from callbacks are rethrown
+        checkErrorAndRethrow();
+    }
+
+    /**
+     * Build the {@link BulkProcessor}.
+     *
+     * <p>Note: this is exposed for testing purposes.
+     */
+    @VisibleForTesting
+    protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener 
listener) {
+        checkNotNull(listener);
+
+        BulkProcessor.Builder bulkProcessorBuilder =
+                callBridge.createBulkProcessorBuilder(client, listener);
+
+        // This makes flush() blocking
+        bulkProcessorBuilder.setConcurrentRequests(0);
+
+        if (bulkProcessorFlushMaxActions != null) {
+            bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
+        }
+
+        if (bulkProcessorFlushMaxSizeMb != null) {
+            configureBulkSize(bulkProcessorBuilder);
+        }
+
+        if (bulkProcessorFlushIntervalMillis != null) {
+            configureFlushInterval(bulkProcessorBuilder);
+        }
+
+        // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy 
will be null
+        callBridge.configureBulkProcessorBackoff(
+                bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
+
+        return bulkProcessorBuilder.build();
+    }
+
+    private void configureBulkSize(BulkProcessor.Builder bulkProcessorBuilder) 
{
+        final ByteSizeUnit sizeUnit;
+        if (bulkProcessorFlushMaxSizeMb == -1) {
+            // bulk size can be disabled with -1, however the ByteSizeValue 
constructor accepts -1
+            // only with BYTES as the size unit
+            sizeUnit = ByteSizeUnit.BYTES;
+        } else {
+            sizeUnit = ByteSizeUnit.MB;
+        }
+        bulkProcessorBuilder.setBulkSize(new 
ByteSizeValue(bulkProcessorFlushMaxSizeMb, sizeUnit));
+    }
+
+    private void configureFlushInterval(BulkProcessor.Builder 
bulkProcessorBuilder) {
+        if (bulkProcessorFlushIntervalMillis == -1) {
+            bulkProcessorBuilder.setFlushInterval(null);
+        } else {
+            callBridge.configureBulkProcessorFlushInterval(
+                    bulkProcessorBuilder, bulkProcessorFlushIntervalMillis);
+        }
+    }
+
+    private void checkErrorAndRethrow() {
+        Throwable cause = failureThrowable.get();
+        if (cause != null) {
+            throw new RuntimeException("An error occurred in 
ElasticsearchSink.", cause);
+        }
+    }
+
+    private void checkAsyncErrorsAndRequests() {
+        checkErrorAndRethrow();
+        failureRequestIndexer.processBufferedRequests(requestIndexer);
+    }
+
+    private class BulkProcessorListener implements BulkProcessor.Listener {
+
+        @Override
+        public void beforeBulk(long executionId, BulkRequest request) {
+        }
+
+        @Override
+        public void afterBulk(long executionId, BulkRequest request, 
BulkResponse response) {
+            if (response.hasFailures()) {
+                BulkItemResponse itemResponse;
+                Throwable failure;
+                RestStatus restStatus;
+                DocWriteRequest actionRequest;
+
+                try {
+                    for (int i = 0; i < response.getItems().length; i++) {
+                        itemResponse = response.getItems()[i];
+                        failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
+                        if (failure != null) {
+                            restStatus = itemResponse.getFailure().getStatus();
+                            actionRequest = request.requests().get(i);
+                            if (restStatus == null) {
+                                if (actionRequest instanceof ActionRequest) {
+                                    failureHandler.onFailure(
+                                            (ActionRequest) actionRequest,
+                                            failure,
+                                            -1,
+                                            failureRequestIndexer);
+                                } else {
+                                    throw new UnsupportedOperationException(
+                                            "The sink currently only supports 
ActionRequests");
+                                }
+                            } else {
+                                if (actionRequest instanceof ActionRequest) {
+                                    failureHandler.onFailure(
+                                            (ActionRequest) actionRequest,
+                                            failure,
+                                            restStatus.getStatus(),
+                                            failureRequestIndexer);
+                                } else {
+                                    throw new UnsupportedOperationException(
+                                            "The sink currently only supports 
ActionRequests");
+                                }
+                            }
+                        }
+                    }
+                } catch (Throwable t) {
+                    // fail the sink and skip the rest of the items
+                    // if the failure handler decides to throw an exception
+                    failureThrowable.compareAndSet(null, t);
+                }
+            }
+
+            if (flushOnCheckpoint) {
+                numPendingRequests.getAndAdd(-request.numberOfActions());
+            }
+        }
+
+        @Override
+        public void afterBulk(long executionId, BulkRequest request, Throwable 
failure) {
+            try {
+                for (DocWriteRequest writeRequest : request.requests()) {
+                    if (writeRequest instanceof ActionRequest) {
+                        failureHandler.onFailure(
+                                (ActionRequest) writeRequest, failure, -1, 
failureRequestIndexer);
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "The sink currently only supports 
ActionRequests");
+                    }
+                }
+            } catch (Throwable t) {
+                // fail the sink and skip the rest of the items
+                // if the failure handler decides to throw an exception
+                failureThrowable.compareAndSet(null, t);
+            }
+
+            if (flushOnCheckpoint) {
+                numPendingRequests.getAndAdd(-request.numberOfActions());
+            }
+        }
+    }
+
+    @VisibleForTesting
+    long getNumPendingRequests() {
+        if (flushOnCheckpoint) {
+            return numPendingRequests.get();
+        } else {
+            throw new UnsupportedOperationException(
+                    "The number of pending requests is not maintained when 
flushing on checkpoint is disabled.");
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
new file mode 100644
index 0000000000..87f334693e
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Creates multiple {@link ActionRequest ActionRequests} from an element in a 
stream.
+ *
+ * <p>This is used by sinks to prepare elements for sending them to 
Elasticsearch.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ *                             private static class 
TestElasticSearchSinkFunction implements
+ *                                     
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+ *
+ *                             public IndexRequest 
createIndexRequest(Tuple2<Integer, String> element) {
+ *                                     Map<String, Object> json = new 
HashMap<>();
+ *                                     json.put("data", element.f1);
+ *
+ *                                     return Requests.indexRequest()
+ *                                             .index("my-index")
+ *                                             .type("my-type")
+ *                                             .id(element.f0.toString())
+ *                                             .source(json);
+ *                                     }
+ *
+ *                     public void process(Tuple2<Integer, String> element, 
RuntimeContext ctx, RequestIndexer indexer) {
+ *                             indexer.add(createIndexRequest(element));
+ *                     }
+ *     }
+ *
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@code 
ElasticsearchSinkFunction}
+ */
+@PublicEvolving
+public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
+
+    /**
+     * Initialization method for the function. It is called once before the 
actual working process
+     * methods, if {@link #open(RuntimeContext)} is not overridden.
+     */
+    default void open() throws Exception {
+    }
+
+    /**
+     * Initialization method for the function. It is called once before the 
actual working process
+     * methods.
+     */
+    default void open(RuntimeContext ctx) throws Exception {
+        open();
+    }
+
+    /** Tear-down method for the function. It is called when the sink closes. 
*/
+    default void close() throws Exception {
+    }
+
+    /**
+     * Process the incoming element to produce multiple {@link ActionRequest 
ActionsRequests}. The
+     * produced requests should be added to the provided {@link 
RequestIndexer}.
+     *
+     * @param element incoming element to process
+     * @param ctx runtime context containing information about the sink 
instance
+     * @param indexer request indexer that {@code ActionRequest} should be 
added to
+     */
+    void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java
new file mode 100644
index 0000000000..ff9ab40695
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+/**
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare them
+ * for sending to an Elasticsearch cluster.
+ */
+@PublicEvolving
+public interface RequestIndexer {
+
+    /**
+     * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to
+     * Elasticsearch.
+     *
+     * @param actionRequests The multiple {@link ActionRequest} to add.
+     * @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@link UpdateRequest}
+     */
+    @Deprecated
+    default void add(ActionRequest... actionRequests) {
+        for (ActionRequest actionRequest : actionRequests) {
+            if (actionRequest instanceof IndexRequest) {
+                add((IndexRequest) actionRequest);
+            } else if (actionRequest instanceof DeleteRequest) {
+                add((DeleteRequest) actionRequest);
+            } else if (actionRequest instanceof UpdateRequest) {
+                add((UpdateRequest) actionRequest);
+            } else {
+                throw new IllegalArgumentException(
+                        "RequestIndexer only supports Index, Delete and Update 
requests");
+            }
+        }
+    }
+
+    /**
+     * Add multiple {@link DeleteRequest} to the indexer to prepare for 
sending requests to
+     * Elasticsearch.
+     *
+     * @param deleteRequests The multiple {@link DeleteRequest} to add.
+     */
+    void add(DeleteRequest... deleteRequests);
+
+    /**
+     * Add multiple {@link IndexRequest} to the indexer to prepare for sending 
requests to
+     * Elasticsearch.
+     *
+     * @param indexRequests The multiple {@link IndexRequest} to add.
+     */
+    void add(IndexRequest... indexRequests);
+
+    /**
+     * Add multiple {@link UpdateRequest} to the indexer to prepare for 
sending requests to
+     * Elasticsearch.
+     *
+     * @param updateRequests The multiple {@link UpdateRequest} to add.
+     */
+    void add(UpdateRequest... updateRequests);
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
new file mode 100644
index 0000000000..b7601f5241
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.format.DateTimeFormatter;
+
+/** Abstract class for time related {@link IndexGenerator}. */
+@Internal
+abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase {
+
+    private final String dateTimeFormat;
+    protected transient DateTimeFormatter dateTimeFormatter;
+
+    public AbstractTimeIndexGenerator(String index, String dateTimeFormat) {
+        super(index);
+        this.dateTimeFormat = dateTimeFormat;
+    }
+
+    @Override
+    public void open() {
+        this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
new file mode 100644
index 0000000000..ba9716d4ac
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.util.IgnoringFailureHandler;
+import org.apache.inlong.sort.elasticsearch.util.NoOpFailureHandler;
+import 
org.apache.inlong.sort.elasticsearch.util.RetryRejectedExecutionFailureHandler;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Accessor methods to elasticsearch options. */
+@Internal
+public class ElasticsearchConfiguration {
+
+    protected final ReadableConfig config;
+    private final ClassLoader classLoader;
+
+    public ElasticsearchConfiguration(ReadableConfig config, ClassLoader 
classLoader) {
+        this.config = config;
+        this.classLoader = classLoader;
+    }
+
+    public ActionRequestFailureHandler getFailureHandler() {
+        final ActionRequestFailureHandler failureHandler;
+        String value = 
config.get(ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION);
+        switch (value.toUpperCase()) {
+            case "FAIL":
+                failureHandler = new NoOpFailureHandler();
+                break;
+            case "IGNORE":
+                failureHandler = new IgnoringFailureHandler();
+                break;
+            case "RETRY-REJECTED":
+                failureHandler = new RetryRejectedExecutionFailureHandler();
+                break;
+            default:
+                try {
+                    Class<?> failureHandlerClass = Class.forName(value, false, 
classLoader);
+                    failureHandler =
+                            (ActionRequestFailureHandler) 
InstantiationUtil.instantiate(failureHandlerClass);
+                } catch (ClassNotFoundException e) {
+                    throw new ValidationException(
+                            "Could not instantiate the failure handler class: 
" + value, e);
+                }
+                break;
+        }
+        return failureHandler;
+    }
+
+    public String getDocumentType() {
+        return config.get(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION);
+    }
+
+    public int getBulkFlushMaxActions() {
+        int maxActions = 
config.get(ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+        // convert 0 to -1, because Elasticsearch client use -1 to disable 
this configuration.
+        return maxActions == 0 ? -1 : maxActions;
+    }
+
+    public long getBulkFlushMaxByteSize() {
+        long maxSize =
+                
config.get(ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable 
this configuration.
+        return maxSize == 0 ? -1 : maxSize;
+    }
+
+    public long getBulkFlushInterval() {
+        long interval = 
config.get(ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION).toMillis();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable 
this configuration.
+        return interval == 0 ? -1 : interval;
+    }
+
+    public Optional<String> getUsername() {
+        return 
config.getOptional(ElasticsearchConnectorOptions.USERNAME_OPTION);
+    }
+
+    public Optional<String> getPassword() {
+        return 
config.getOptional(ElasticsearchConnectorOptions.PASSWORD_OPTION);
+    }
+
+    public boolean isBulkFlushBackoffEnabled() {
+        return config.get(
+                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION) 
!= ElasticsearchConnectorOptions.BackOffType.DISABLED;
+    }
+
+    public Optional<ElasticsearchSinkBase.FlushBackoffType> 
getBulkFlushBackoffType() {
+        switch 
(config.get(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION)) {
+            case CONSTANT:
+                return 
Optional.of(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);
+            case EXPONENTIAL:
+                return 
Optional.of(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+            default:
+                return Optional.empty();
+        }
+    }
+
+    public Optional<Integer> getBulkFlushBackoffRetries() {
+        return 
config.getOptional(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION);
+    }
+
+    public Optional<Long> getBulkFlushBackoffDelay() {
+        return 
config.getOptional(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION)
+                .map(Duration::toMillis);
+    }
+
+    public boolean isDisableFlushOnCheckpoint() {
+        return 
!config.get(ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION);
+    }
+
+    public String getIndex() {
+        return config.get(ElasticsearchConnectorOptions.INDEX_OPTION);
+    }
+
+    public String getKeyDelimiter() {
+        return config.get(ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION);
+    }
+
+    public Optional<String> getPathPrefix() {
+        return 
config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ElasticsearchConfiguration that = (ElasticsearchConfiguration) o;
+        return Objects.equals(config, that.config) && 
Objects.equals(classLoader, that.classLoader);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(config, classLoader);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
new file mode 100644
index 0000000000..27c8e390fd
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for the Elasticsearch connector.
+ *
+ */
+@PublicEvolving
+public class ElasticsearchConnectorOptions {
+
+    public static final ConfigOption<List<String>> HOSTS_OPTION =
+            ConfigOptions.key("hosts")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch hosts to connect to.");
+
+    public static final ConfigOption<String> INDEX_OPTION =
+            ConfigOptions.key("index")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch index for every record.");
+
+    public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+            ConfigOptions.key("document-type")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch document type.");
+
+    public static final ConfigOption<String> PASSWORD_OPTION =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Password used to connect to 
Elasticsearch instance.");
+
+    public static final ConfigOption<String> USERNAME_OPTION =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Username used to connect to 
Elasticsearch instance.");
+
+    public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+            ConfigOptions.key("document-id.key-delimiter")
+                    .stringType()
+                    .defaultValue("_")
+                    .withDescription(
+                            "Delimiter for composite keys e.g., \"$\" would 
result in IDs \"KEY1$KEY2$KEY3\".");
+
+    public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+            ConfigOptions.key("failure-handler")
+                    .stringType()
+                    .defaultValue("fail")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Failure handling strategy in case 
a request to Elasticsearch fails")
+                                    .list(
+                                            text(
+                                                    "\"fail\" (throws an 
exception if a request fails and thus causes a job failure)"),
+                                            text(
+                                                    "\"ignore\" (ignores 
failures and drops the request)"),
+                                            text(
+                                                    "\"retry-rejected\" 
(re-adds requests that have failed due to queue capacity saturation)"),
+                                            text(
+                                                    "\"class name\" for 
failure handling with a ActionRequestFailureHandler subclass"))
+                                    .build());
+
+    public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+            ConfigOptions.key("sink.flush-on-checkpoint")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Disables flushing on checkpoint");
+
+    public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+            ConfigOptions.key("sink.bulk-flush.max-actions")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription("Maximum number of actions to buffer for 
each bulk request.");
+
+    public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+            ConfigOptions.key("sink.bulk-flush.max-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("2mb"))
+                    .withDescription("Maximum size of buffered actions per 
bulk request");
+
+    public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+            ConfigOptions.key("sink.bulk-flush.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription("Bulk flush interval");
+
+    public static final ConfigOption<BackOffType> 
BULK_FLUSH_BACKOFF_TYPE_OPTION =
+            ConfigOptions.key("sink.bulk-flush.backoff.strategy")
+                    .enumType(BackOffType.class)
+                    .defaultValue(BackOffType.DISABLED)
+                    .withDescription("Backoff strategy");
+
+    public static final ConfigOption<Integer> 
BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+            ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Maximum number of retries.");
+
+    public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION 
=
+            ConfigOptions.key("sink.bulk-flush.backoff.delay")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("Delay between each backoff attempt.");
+
+    public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+            ConfigOptions.key("connection.path-prefix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Prefix string to be added to every REST 
communication.");
+
+    public static final ConfigOption<String> FORMAT_OPTION =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .defaultValue("json")
+                    .withDescription(
+                            "The format must produce a valid JSON document. "
+                                    + "Please refer to the documentation on 
formats for more details.");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Enums
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Backoff strategy. Extends {@link 
ElasticsearchSinkBase.FlushBackoffType} with {@code
+     * DISABLED} option.
+     */
+    public enum BackOffType {
+        DISABLED,
+        CONSTANT,
+        EXPONENTIAL
+    }
+
+    private ElasticsearchConnectorOptions() {
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
new file mode 100644
index 0000000000..0b04440a5a
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Utility methods for validating Elasticsearch properties. */
+@Internal
+public class ElasticsearchValidationUtils {
+
+    private static final Set<LogicalTypeRoot> ILLEGAL_PRIMARY_KEY_TYPES = new 
LinkedHashSet<>();
+
+    static {
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ARRAY);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MAP);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MULTISET);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.STRUCTURED_TYPE);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ROW);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.RAW);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BINARY);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARBINARY);
+    }
+
+    /**
+     * Checks that the table does not have primary key defined on illegal 
types. In Elasticsearch
+     * the primary key is used to calculate the Elasticsearch document id, 
which is a string of up
+     * to 512 bytes. It cannot have whitespaces. As of now it is calculated by 
concatenating the
+     * fields. Certain types do not have a good string representation to be 
used in this scenario.
+     * The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types 
and {@link
+     * LogicalTypeRoot#RAW} type.
+     */
+    public static void validatePrimaryKey(TableSchema schema) {
+        schema.getPrimaryKey()
+                .ifPresent(
+                        key -> {
+                            List<LogicalTypeRoot> illegalTypes =
+                                    key.getColumns().stream()
+                                            .map(
+                                                    fieldName -> {
+                                                        LogicalType 
logicalType =
+                                                                
schema.getFieldDataType(fieldName)
+                                                                        .get()
+                                                                        
.getLogicalType();
+                                                        if (logicalType.is(
+                                                                
LogicalTypeRoot.DISTINCT_TYPE)) {
+                                                            return 
((DistinctType) logicalType)
+                                                                    
.getSourceType()
+                                                                    
.getTypeRoot();
+                                                        } else {
+                                                            return 
logicalType.getTypeRoot();
+                                                        }
+                                                    })
+                                            
.filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)
+                                            .collect(Collectors.toList());
+
+                            if (!illegalTypes.isEmpty()) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "The table has a primary key 
on columns of illegal types: %s.\n"
+                                                        + " Elasticsearch sink 
does not support primary keys on columns of types: %s.",
+                                                illegalTypes, 
ILLEGAL_PRIMARY_KEY_TYPES));
+                            }
+                        });
+    }
+
+    private ElasticsearchValidationUtils() {
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
new file mode 100644
index 0000000000..38ff5e1fba
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+
+/** This interface is responsible to generate index name from given {@link 
Row} record. */
+@Internal
+interface IndexGenerator extends Serializable {
+
+    /**
+     * Initialize the index generator, this will be called only once before 
{@link
+     * #generate(RowData)} is called.
+     */
+    default void open() {
+    }
+
+    /** Generate index name according the the given row. */
+    String generate(RowData row);
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
new file mode 100644
index 0000000000..3b43b636f2
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+/** Base class for {@link IndexGenerator}. */
+@Internal
+public abstract class IndexGeneratorBase implements IndexGenerator {
+
+    private static final long serialVersionUID = 1L;
+    protected final String index;
+
+    public IndexGeneratorBase(String index) {
+        this.index = index;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IndexGeneratorBase)) {
+            return false;
+        }
+        IndexGeneratorBase that = (IndexGeneratorBase) o;
+        return index.equals(that.index);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
new file mode 100644
index 0000000000..7f25c8f8a5
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain 
string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" 
index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to 
reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of 
TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is 
compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 
'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written 
into
+ * "myusers_2020-03-27" index.
+ */
+@Internal
+public final class IndexGeneratorFactory {
+
+    private IndexGeneratorFactory() {
+    }
+
+    public static IndexGenerator createIndexGenerator(String index, 
TableSchema schema) {
+        return createIndexGenerator(index, schema, ZoneId.systemDefault());
+    }
+
+    public static IndexGenerator createIndexGenerator(
+            String index, TableSchema schema, ZoneId localTimeZoneId) {
+        final IndexHelper indexHelper = new IndexHelper();
+        if (indexHelper.checkIsDynamicIndex(index)) {
+            return createRuntimeIndexGenerator(
+                    index,
+                    schema.getFieldNames(),
+                    schema.getFieldDataTypes(),
+                    indexHelper,
+                    localTimeZoneId);
+        } else {
+            return new StaticIndexGenerator(index);
+        }
+    }
+
+    interface DynamicFormatter extends Serializable {
+
+        String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
+    }
+
+    private static IndexGenerator createRuntimeIndexGenerator(
+            String index,
+            String[] fieldNames,
+            DataType[] fieldTypes,
+            IndexHelper indexHelper,
+            ZoneId localTimeZoneId) {
+        final String dynamicIndexPatternStr = 
indexHelper.extractDynamicIndexPatternStr(index);
+        final String indexPrefix = index.substring(0, 
index.indexOf(dynamicIndexPatternStr));
+        final String indexSuffix =
+                index.substring(indexPrefix.length() + 
dynamicIndexPatternStr.length());
+
+        if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) {
+            final String dateTimeFormat =
+                    indexHelper.extractDateFormat(
+                            index, 
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+
+                @Override
+                public String generate(RowData row) {
+                    return indexPrefix
+                            
.concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter))
+                            .concat(indexSuffix);
+                }
+            };
+        }
+
+        final boolean isDynamicIndexWithFormat = 
indexHelper.checkIsDynamicIndexWithFormat(index);
+        final int indexFieldPos =
+                indexHelper.extractIndexFieldPos(index, fieldNames, 
isDynamicIndexWithFormat);
+        final LogicalType indexFieldType = 
fieldTypes[indexFieldPos].getLogicalType();
+        final LogicalTypeRoot indexFieldLogicalTypeRoot = 
indexFieldType.getTypeRoot();
+
+        // validate index field type
+        indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);
+
+        // time extract dynamic index pattern
+        final RowData.FieldGetter fieldGetter =
+                RowData.createFieldGetter(indexFieldType, indexFieldPos);
+
+        if (isDynamicIndexWithFormat) {
+            final String dateTimeFormat =
+                    indexHelper.extractDateFormat(index, 
indexFieldLogicalTypeRoot);
+            DynamicFormatter formatFunction =
+                    createFormatFunction(
+                            indexFieldType, indexFieldLogicalTypeRoot, 
localTimeZoneId);
+
+            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+
+                @Override
+                public String generate(RowData row) {
+                    Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+                    final String formattedField;
+                    // TODO we can possibly optimize it to use the nullability 
of the field
+                    if (fieldOrNull != null) {
+                        formattedField = formatFunction.format(fieldOrNull, 
dateTimeFormatter);
+                    } else {
+                        formattedField = "null";
+                    }
+                    return 
indexPrefix.concat(formattedField).concat(indexSuffix);
+                }
+            };
+        }
+        // general dynamic index pattern
+        return new IndexGeneratorBase(index) {
+
+            @Override
+            public String generate(RowData row) {
+                Object indexField = fieldGetter.getFieldOrNull(row);
+                return indexPrefix
+                        .concat(indexField == null ? "null" : 
indexField.toString())
+                        .concat(indexSuffix);
+            }
+        };
+    }
+
+    private static DynamicFormatter createFormatFunction(
+            LogicalType indexFieldType,
+            LogicalTypeRoot indexFieldLogicalTypeRoot,
+            ZoneId localTimeZoneId) {
+        switch (indexFieldLogicalTypeRoot) {
+            case DATE:
+                return (value, dateTimeFormatter) -> {
+                    Integer indexField = (Integer) value;
+                    return 
LocalDate.ofEpochDay(indexField).format(dateTimeFormatter);
+                };
+            case TIME_WITHOUT_TIME_ZONE:
+                return (value, dateTimeFormatter) -> {
+                    Integer indexField = (Integer) value;
+                    return LocalTime.ofNanoOfDay(indexField * 
1_000_000L).format(dateTimeFormatter);
+                };
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return (value, dateTimeFormatter) -> {
+                    TimestampData indexField = (TimestampData) value;
+                    return 
indexField.toLocalDateTime().format(dateTimeFormatter);
+                };
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException(
+                        "TIMESTAMP_WITH_TIME_ZONE is not supported yet");
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return (value, dateTimeFormatter) -> {
+                    TimestampData indexField = (TimestampData) value;
+                    return 
indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter);
+                };
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported type '%s' found in Elasticsearch 
dynamic index field, "
+                                        + "time-related pattern only support 
types are: DATE,TIME,TIMESTAMP.",
+                                indexFieldType));
+        }
+    }
+
+    /**
+     * Helper class for {@link IndexGeneratorFactory}, this helper can use to 
validate index field
+     * type ans parse index format from pattern.
+     */
+    public static class IndexHelper {
+
+        private static final Pattern dynamicIndexPattern = 
Pattern.compile("\\{[^\\{\\}]+\\}?");
+        private static final Pattern dynamicIndexTimeExtractPattern =
+                Pattern.compile(".*\\{.+\\|.*\\}.*");
+        private static final Pattern dynamicIndexSystemTimeExtractPattern =
+                Pattern.compile(
+                        
".*\\{\\s*(now\\(\\s*\\)|NOW\\(\\s*\\)|current_timestamp|CURRENT_TIMESTAMP)\\s*\\|.*\\}.*");
+        private static final List<LogicalTypeRoot> supportedTypes = new 
ArrayList<>();
+        private static final Map<LogicalTypeRoot, String> defaultFormats = new 
HashMap<>();
+
+        static {
+            // time related types
+            supportedTypes.add(LogicalTypeRoot.DATE);
+            supportedTypes.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+            supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+            supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+            supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+            // general types
+            supportedTypes.add(LogicalTypeRoot.VARCHAR);
+            supportedTypes.add(LogicalTypeRoot.CHAR);
+            supportedTypes.add(LogicalTypeRoot.TINYINT);
+            supportedTypes.add(LogicalTypeRoot.INTEGER);
+            supportedTypes.add(LogicalTypeRoot.BIGINT);
+        }
+
+        static {
+            defaultFormats.put(LogicalTypeRoot.DATE, "yyyy_MM_dd");
+            defaultFormats.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, 
"HH_mm_ss");
+            defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, 
"yyyy_MM_dd_HH_mm_ss");
+            defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, 
"yyyy_MM_dd_HH_mm_ss");
+            defaultFormats.put(
+                    LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 
"yyyy_MM_dd_HH_mm_ssX");
+        }
+
+        /** Validate the index field Type. */
+        void validateIndexFieldType(LogicalTypeRoot logicalType) {
+            if (!supportedTypes.contains(logicalType)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Unsupported type %s of index field, " + 
"Supported types are: %s",
+                                logicalType, supportedTypes));
+            }
+        }
+
+        /** Get the default date format. */
+        String getDefaultFormat(LogicalTypeRoot logicalType) {
+            return defaultFormats.get(logicalType);
+        }
+
+        /** Check general dynamic index is enabled or not by index pattern. */
+        boolean checkIsDynamicIndex(String index) {
+            final Matcher matcher = dynamicIndexPattern.matcher(index);
+            int count = 0;
+            while (matcher.find()) {
+                count++;
+            }
+            if (count > 1) {
+                throw new TableException(
+                        String.format(
+                                "Chaining dynamic index pattern %s is not 
supported,"
+                                        + " only support single dynamic index 
pattern.",
+                                index));
+            }
+            return count == 1;
+        }
+
+        /** Check time extract dynamic index is enabled or not by index 
pattern. */
+        boolean checkIsDynamicIndexWithFormat(String index) {
+            return dynamicIndexTimeExtractPattern.matcher(index).matches();
+        }
+
+        /** Check generate dynamic index is from system time or not. */
+        public boolean checkIsDynamicIndexWithSystemTimeFormat(String index) {
+            return 
dynamicIndexSystemTimeExtractPattern.matcher(index).matches();
+        }
+
+        /** Extract dynamic index pattern string from index pattern string. */
+        String extractDynamicIndexPatternStr(String index) {
+            int start = index.indexOf("{");
+            int end = index.lastIndexOf("}");
+            return index.substring(start, end + 1);
+        }
+
+        /** Extract index field position in a fieldNames, return the field 
position. */
+        int extractIndexFieldPos(
+                String index, String[] fieldNames, boolean 
isDynamicIndexWithFormat) {
+            List<String> fieldList = Arrays.asList(fieldNames);
+            String indexFieldName;
+            if (isDynamicIndexWithFormat) {
+                indexFieldName = index.substring(index.indexOf("{") + 1, 
index.indexOf("|"));
+            } else {
+                indexFieldName = index.substring(index.indexOf("{") + 1, 
index.indexOf("}"));
+            }
+            if (!fieldList.contains(indexFieldName)) {
+                throw new TableException(
+                        String.format(
+                                "Unknown field '%s' in index pattern '%s', 
please check the field name.",
+                                indexFieldName, index));
+            }
+            return fieldList.indexOf(indexFieldName);
+        }
+
+        /** Extract dateTime format by the date format that extracted from 
index pattern string. */
+        private String extractDateFormat(String index, LogicalTypeRoot 
logicalType) {
+            String format = index.substring(index.indexOf("|") + 1, 
index.indexOf("}"));
+            if ("".equals(format)) {
+                format = getDefaultFormat(logicalType);
+            }
+            return format;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
new file mode 100644
index 0000000000..e4808c0e6b
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** An extractor for a Elasticsearch key from a {@link RowData}. */
+@Internal
+public class KeyExtractor implements Function<RowData, String>, Serializable {
+
+    private final FieldFormatter[] fieldFormatters;
+    private final String keyDelimiter;
+
+    private interface FieldFormatter extends Serializable {
+
+        String format(RowData rowData);
+    }
+
+    private KeyExtractor(FieldFormatter[] fieldFormatters, String 
keyDelimiter) {
+        this.fieldFormatters = fieldFormatters;
+        this.keyDelimiter = keyDelimiter;
+    }
+
+    @Override
+    public String apply(RowData rowData) {
+        final StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < fieldFormatters.length; i++) {
+            if (i > 0) {
+                builder.append(keyDelimiter);
+            }
+            final String value = fieldFormatters[i].format(rowData);
+            builder.append(value);
+        }
+        return builder.toString();
+    }
+
+    private static class ColumnWithIndex {
+
+        public TableColumn column;
+        public int index;
+
+        public ColumnWithIndex(TableColumn column, int index) {
+            this.column = column;
+            this.index = index;
+        }
+
+        public LogicalType getType() {
+            return column.getType().getLogicalType();
+        }
+
+        public int getIndex() {
+            return index;
+        }
+    }
+
+    public static Function<RowData, String> createKeyExtractor(
+            TableSchema schema, String keyDelimiter) {
+        return schema.getPrimaryKey()
+                .map(
+                        key -> {
+                            Map<String, ColumnWithIndex> namesToColumns = new 
HashMap<>();
+                            List<TableColumn> tableColumns = 
schema.getTableColumns();
+                            for (int i = 0; i < schema.getFieldCount(); i++) {
+                                TableColumn column = tableColumns.get(i);
+                                namesToColumns.put(
+                                        column.getName(), new 
ColumnWithIndex(column, i));
+                            }
+
+                            FieldFormatter[] fieldFormatters =
+                                    key.getColumns().stream()
+                                            .map(namesToColumns::get)
+                                            .map(
+                                                    column -> toFormatter(
+                                                            column.index, 
column.getType()))
+                                            .toArray(FieldFormatter[]::new);
+
+                            return (Function<RowData, String>) new 
KeyExtractor(fieldFormatters, keyDelimiter);
+                        })
+                .orElseGet(() -> (Function<RowData, String> & Serializable) 
(row) -> null);
+    }
+
+    private static FieldFormatter toFormatter(int index, LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case DATE:
+                return (row) -> 
LocalDate.ofEpochDay(row.getInt(index)).toString();
+            case TIME_WITHOUT_TIME_ZONE:
+                return (row) -> LocalTime.ofNanoOfDay((long) row.getInt(index) 
* 1_000_000L).toString();
+            case INTERVAL_YEAR_MONTH:
+                return (row) -> Period.ofDays(row.getInt(index)).toString();
+            case INTERVAL_DAY_TIME:
+                return (row) -> 
Duration.ofMillis(row.getLong(index)).toString();
+            case DISTINCT_TYPE:
+                return toFormatter(index, ((DistinctType) 
type).getSourceType());
+            default:
+                RowData.FieldGetter fieldGetter = 
RowData.createFieldGetter(type, index);
+                return (row) -> fieldGetter.getFieldOrNull(row).toString();
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
new file mode 100644
index 0000000000..24815296df
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.Serializable;
+
+/** For version-agnostic creating of {@link ActionRequest}s. */
+@Internal
+public interface RequestFactory extends Serializable {
+
+    /**
+     * Creates an update request to be added to a {@link RequestIndexer}. 
Note: the type field has
+     * been deprecated since Elasticsearch 7.x and it would not take any 
effort.
+     */
+    UpdateRequest createUpdateRequest(
+            String index, String docType, String key, XContentType 
contentType, byte[] document);
+
+    /**
+     * Creates an index request to be added to a {@link RequestIndexer}. Note: 
the type field has
+     * been deprecated since Elasticsearch 7.x and it would not take any 
effort.
+     */
+    IndexRequest createIndexRequest(
+            String index, String docType, String key, XContentType 
contentType, byte[] document);
+
+    /**
+     * Creates a delete request to be added to a {@link RequestIndexer}. Note: 
the type field has
+     * been deprecated since Elasticsearch 7.x and it would not take any 
effort.
+     */
+    DeleteRequest createDeleteRequest(String index, String docType, String 
key);
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
new file mode 100644
index 0000000000..260ed576d9
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import 
org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/** Sink function for converting upserts into Elasticsearch {@link 
ActionRequest}s. */
+@Internal
+public class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IndexGenerator indexGenerator;
+    private final String docType;
+    private final SerializationSchema<RowData> serializationSchema;
+    private final XContentType contentType;
+    private final RequestFactory requestFactory;
+    private final Function<RowData, String> createKey;
+
+    public RowElasticsearchSinkFunction(
+            IndexGenerator indexGenerator,
+            @Nullable String docType, // this is deprecated in es 7+
+            SerializationSchema<RowData> serializationSchema,
+            XContentType contentType,
+            RequestFactory requestFactory,
+            Function<RowData, String> createKey) {
+        this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
+        this.docType = docType;
+        this.serializationSchema = 
Preconditions.checkNotNull(serializationSchema);
+        this.contentType = Preconditions.checkNotNull(contentType);
+        this.requestFactory = Preconditions.checkNotNull(requestFactory);
+        this.createKey = Preconditions.checkNotNull(createKey);
+    }
+
+    @Override
+    public void open(RuntimeContext ctx) throws Exception {
+        serializationSchema.open(
+                
RuntimeContextInitializationContextAdapters.serializationAdapter(ctx));
+        indexGenerator.open();
+    }
+
+    @Override
+    public void process(RowData element, RuntimeContext ctx, RequestIndexer 
indexer) {
+        switch (element.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                processUpsert(element, indexer);
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                processDelete(element, indexer);
+                break;
+            default:
+                throw new TableException("Unsupported message kind: " + 
element.getRowKind());
+        }
+    }
+
+    private void processUpsert(RowData row, RequestIndexer indexer) {
+        final byte[] document = serializationSchema.serialize(row);
+        final String key = createKey.apply(row);
+        if (key != null) {
+            final UpdateRequest updateRequest =
+                    requestFactory.createUpdateRequest(
+                            indexGenerator.generate(row), docType, key, 
contentType, document);
+            indexer.add(updateRequest);
+        } else {
+            final IndexRequest indexRequest =
+                    requestFactory.createIndexRequest(
+                            indexGenerator.generate(row), docType, key, 
contentType, document);
+            indexer.add(indexRequest);
+        }
+    }
+
+    private void processDelete(RowData row, RequestIndexer indexer) {
+        final String key = createKey.apply(row);
+        final DeleteRequest deleteRequest =
+                
requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
+        indexer.add(deleteRequest);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RowElasticsearchSinkFunction that = (RowElasticsearchSinkFunction) o;
+        return Objects.equals(indexGenerator, that.indexGenerator)
+                && Objects.equals(docType, that.docType)
+                && Objects.equals(serializationSchema, 
that.serializationSchema)
+                && contentType == that.contentType
+                && Objects.equals(requestFactory, that.requestFactory)
+                && Objects.equals(createKey, that.createKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                indexGenerator,
+                docType,
+                serializationSchema,
+                contentType,
+                requestFactory,
+                createKey);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
new file mode 100644
index 0000000000..0f59ce0969
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+
+/** A static {@link IndexGenerator} which generate fixed index name. */
+@Internal
+final class StaticIndexGenerator extends IndexGeneratorBase {
+
+    public StaticIndexGenerator(String index) {
+        super(index);
+    }
+
+    public String generate(RowData row) {
+        return index;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java
new file mode 100644
index 0000000000..cdbfa731a9
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.util;
+
+import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.ActionRequest;
+
+/** Ignores all kinds of failures and drops the affected {@link 
ActionRequest}. */
+@Internal
+public class IgnoringFailureHandler implements ActionRequestFailureHandler {
+
+    private static final long serialVersionUID = 1662846593501L;
+
+    @Override
+    public void onFailure(
+            ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) {
+        // ignore failure
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java
new file mode 100644
index 0000000000..477b2b6b6f
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.util;
+
+import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.ActionRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An {@link ActionRequestFailureHandler} that simply fails the sink on any 
failures. */
+@Internal
+public class NoOpFailureHandler implements ActionRequestFailureHandler {
+
+    private static final long serialVersionUID = 737941343410827885L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NoOpFailureHandler.class);
+
+    @Override
+    public void onFailure(
+            ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer)
+            throws Throwable {
+        LOG.error("Failed Elasticsearch item request: {}", 
failure.getMessage(), failure);
+        // simply fail the sink
+        throw failure;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return o instanceof NoOpFailureHandler;
+    }
+
+    @Override
+    public int hashCode() {
+        return NoOpFailureHandler.class.hashCode();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
new file mode 100644
index 0000000000..1afb7b0ee6
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.util;
+
+import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that failed 
due to temporary {@link
+ * EsRejectedExecutionException}s (which means that Elasticsearch node queues 
are currently full),
+ * and fails for all other failures.
+ *
+ */
+@PublicEvolving
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+    private static final long serialVersionUID = -7423562912824511906L;
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(RetryRejectedExecutionFailureHandler.class);
+
+    @Override
+    public void onFailure(
+            ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer)
+            throws Throwable {
+        LOG.error("Failed Elasticsearch item request: {}", 
failure.getMessage(), failure);
+        if (ExceptionUtils.findThrowable(failure, 
EsRejectedExecutionException.class).isPresent()) {
+            indexer.add(action);
+        } else {
+            // rethrow all other failures
+            throw failure;
+        }
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
index 72132b4de6..f21472326a 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
@@ -33,6 +33,7 @@
     <modules>
         <module>pulsar</module>
         <module>jdbc</module>
+        <module>elasticsearch-base</module>
     </modules>
 
     <properties>
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 8d1fd1ede9..d5b47b354e 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -938,6 +938,27 @@ License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 Source  : org.apache.flink:flink-connector-jdbc-1.15.4.jar (Please note that 
the software have been modified.)
 License : https://github.com/apache/flink/blob/master/LICENSE
 
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java
+Source  : org.apache.flink:flink-connector-elasticsearch-base-3.0.1-1.17.jar 
(Please note that the software have been modified.)
+License : https://github.com/apache/flink/blob/master/LICENSE
+
 
 =======================================================================
 Apache InLong Subcomponents:

Reply via email to