zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044352277


##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html

Review Comment:
   you won't need these since the page didn't exist in the old structures :)



##########
docs/content/docs/connectors/table/opensearch.md:
##########
@@ -0,0 +1,286 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+aliases:
+  - /dev/table/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch 
engine. This document describes how to setup the Opensearch Connector to run 
SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages 
with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in 
append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, the valid value is: `opensearch`
+</td>
+    </tr>
+    <tr>
+      <td><h5>hosts</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>One or more Opensearch hosts to connect to, e.g. 
<code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>index</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Opensearch index for every record. Can be a static index (e.g. 
<code>'myIndex'</code>) or
+       a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+       See the following <a href="#dynamic-index">Dynamic Index</a> section 
for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>allow-insecure</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Boolean</td>
+      <td>Allow insecure connections to `HTTPS` endpoints (disable 
certificates validation).</td>
+    </tr>
+    <tr>
+      <td><h5>document-id.key-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">_</td>
+      <td>String</td>
+      <td>Delimiter for composite keys ("_" by default), e.g., "$" would 
result in IDs "KEY1$KEY2$KEY3".</td>
+    </tr>
+    <tr>
+      <td><h5>username</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Username used to connect to Opensearch instance. Please notice that 
Opensearch comes with pre-bundled security feature, you can disable it by 
following the <a 
href="https://opensearch.org/docs/latest/security-plugin/configuration/index/";>guidelines</a>
 on how to configure the security for your Opensearch cluster.</td>
+    </tr>
+    <tr>
+      <td><h5>password</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password used to connect to Opensearch instance. If 
<code>username</code> is configured, this option must be configured with 
non-empty string as well.</td>
+    </tr>
+    <tr>
+      <td><h5>failure-handler</h5></td>

Review Comment:
   This option doesn't exist in the table connector. you opted for using the 
non-deprecated connector from the table api which doesn't support the failure 
handler-



##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< py_download_link "opensearch" >}}

Review Comment:
   ditto re python (see below)



##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< py_download_link "opensearch" >}}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for 
information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Opensearch
+
+Instructions for setting up an Opensearch cluster can be found
+[here](https://opensearch.org/docs/latest/opensearch/install/index/).
+
+## Opensearch Sink
+
+The example below shows how to configure and create a sink:
+
+{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, 
RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+Opensearch static index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, 
OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
+os_sink = OpensearchSinkBuilder() \
+    .set_bulk_flush_max_actions(1) \
+    .set_emitter(OpensearchEmitter.static('foo', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os sink')
+```
+
+Opensearch dynamic index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, 
OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+os_sink = OpensearchSinkBuilder() \
+    .set_emitter(OpensearchEmitter.dynamic_index('name', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os dynamic index sink')
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `OpensearchEmitter`
+can be used to perform requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The 
`BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
+
+### Opensearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
+at-least-once delivery of action requests to Opensearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Opensearch, 
before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance 
docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
+
+To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to 
be enabled at the execution environment:
+
+{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+# checkpoint every 5000 msecs
+env.enable_checkpointing(5000)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default 
delivery guarantee is `AT_LEAST_ONCE`.
+This causes the sink to buffer requests until it either finishes or the 
`BulkProcessor` flushes automatically. 
+By default, the `BulkProcessor` will flush after `1000` added actions. To 
configure the processor to flush more frequently, please refer to the <a 
href="#configuring-the-internal-bulk-processor">BulkProcessor configuration 
section</a>.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+Using `UpdateRequests` with deterministic IDs and the upsert method it is 
possible to achieve exactly-once semantics in Opensearch when `AT_LEAST_ONCE` 
delivery is configured for the connector.
+</p>
+
+### Handling Failing Opensearch Requests
+
+Opensearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Opensearch Sink allows the user to retry requests by specifying a 
backoff-policy.
+
+Below is an example:
+
+{{< tabs "ddb958b3-5dd5-476e-b946-ace3335628b2" >}}
+{{< tab "Java" >}}
+```java
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        // This enables an exponential backoff retry mechanism, with a maximum 
of 5 retries and an initial delay of 1000 milliseconds
+        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+        .build());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    // This enables an exponential backoff retry mechanism, with a maximum of 
5 retries and an initial delay of 1000 milliseconds
+    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+    .build())
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}

Review Comment:
   The python API doesn't support this connector. (Needs a compatibility layer 
that currently has be added to Flink)



##########
docs/content/docs/connectors/table/opensearch.md:
##########
@@ -0,0 +1,286 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+aliases:
+  - /dev/table/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch 
engine. This document describes how to setup the Opensearch Connector to run 
SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages 
with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in 
append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options

Review Comment:
   missing connection.timeout, socket.timeout, and possibly others.
   This must be in sync with `OpensearchConnectorOptions`.



##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< py_download_link "opensearch" >}}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for 
information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Opensearch
+
+Instructions for setting up an Opensearch cluster can be found
+[here](https://opensearch.org/docs/latest/opensearch/install/index/).
+
+## Opensearch Sink
+
+The example below shows how to configure and create a sink:
+
+{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, 
RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+Opensearch static index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, 
OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
+os_sink = OpensearchSinkBuilder() \
+    .set_bulk_flush_max_actions(1) \
+    .set_emitter(OpensearchEmitter.static('foo', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os sink')
+```
+
+Opensearch dynamic index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, 
OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+os_sink = OpensearchSinkBuilder() \
+    .set_emitter(OpensearchEmitter.dynamic_index('name', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os dynamic index sink')
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `OpensearchEmitter`
+can be used to perform requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The 
`BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
+
+### Opensearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
+at-least-once delivery of action requests to Opensearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Opensearch, 
before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance 
docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
+
+To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to 
be enabled at the execution environment:
+
+{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}

Review Comment:
   This should use a different id. Can be anything really; just make it 
different than the ES one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to