This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 04ee8aca04 [Feature][http-Sink] Implementing http batch writes (#9292)
04ee8aca04 is described below
commit 04ee8aca04192c35cdcc548f46309ba9cd0f5bf9
Author: ocean-zhc <[email protected]>
AuthorDate: Tue May 20 09:38:11 2025 +0800
[Feature][http-Sink] Implementing http batch writes (#9292)
---
docs/en/connector-v2/sink/Http.md | 18 ++
docs/zh/connector-v2/sink/Http.md | 18 ++
.../seatunnel/http/config/HttpParameter.java | 3 +
.../seatunnel/http/config/HttpSinkOptions.java | 25 ++-
.../connectors/seatunnel/http/sink/HttpSink.java | 10 +
.../seatunnel/http/sink/HttpSinkFactory.java | 3 +
.../seatunnel/http/sink/HttpSinkWriter.java | 101 ++++++++++-
.../http/sink/HttpSinkBatchWriterTest.java | 201 +++++++++++++++++++++
8 files changed, 376 insertions(+), 3 deletions(-)
diff --git a/docs/en/connector-v2/sink/Http.md
b/docs/en/connector-v2/sink/Http.md
index 6cd7f7a884..658102d7f5 100644
--- a/docs/en/connector-v2/sink/Http.md
+++ b/docs/en/connector-v2/sink/Http.md
@@ -44,6 +44,9 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| retry_backoff_max_ms | Int | No | 10000 | The maximum
retry-backoff times(millis) if request http failed
|
| connect_timeout_ms | Int | No | 12000 | Connection
timeout setting, default 12s.
|
| socket_timeout_ms | Int | No | 60000 | Socket timeout
setting, default 60s.
|
+| array_mode | Boolean| No | false | Send data as a
JSON array when true, or as a single JSON object when false (default)
|
+| batch_size | Int | No | 1 | The batch size
of records to send in one HTTP request. Only works when array_mode is true.
|
+| request_interval_ms | Int | No | 0 | The interval
milliseconds between two HTTP requests, to avoid sending requests too
frequently. |
| common-options | | No | - | Sink plugin
common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details |
## Example
@@ -59,6 +62,21 @@ Http {
}
```
+### With Batch Processing
+
+```hocon
+Http {
+ url = "http://localhost/test/webhook"
+ headers {
+ token = "9e32e859ef044462a257e1fc76730066"
+ Content-Type = "application/json"
+ }
+ array_mode = true
+ batch_size = 50
+ request_interval_ms = 500
+}
+```
+
### Multiple table
#### example1
diff --git a/docs/zh/connector-v2/sink/Http.md
b/docs/zh/connector-v2/sink/Http.md
index 1e9e970c55..b7d850639b 100644
--- a/docs/zh/connector-v2/sink/Http.md
+++ b/docs/zh/connector-v2/sink/Http.md
@@ -42,6 +42,9 @@ import ChangeLog from '../changelog/connector-http.md';
| retry_backoff_max_ms | Int | 否 | 10000 | http请求失败,最大重试回退时间(毫秒)
|
| connect_timeout_ms | Int | 否 | 12000 | 连接超时设置,默认12s
|
| socket_timeout_ms | Int | 否 | 60000 | 套接字超时设置,默认为60s
|
+| array_mode | Boolean| 否 | false |
为true时将数据作为JSON数组发送,为false时作为单个JSON对象发送(默认) |
+| batch_size | Int | 否 | 1 |
在一个HTTP请求中发送的记录批量大小。仅在array_mode为true时有效 |
+| request_interval_ms | Int | 否 | 0 |
两次HTTP请求之间的间隔毫秒数,以避免请求过于频繁 |
| common-options | | 否 | - | Sink插件常用参数,请参考
[Sink常用选项 ](../sink-common-options.md) 了解详情 |
## 示例
@@ -57,6 +60,21 @@ Http {
}
```
+### 带批处理的示例
+
+```hocon
+Http {
+ url = "http://localhost/test/webhook"
+ headers {
+ token = "9e32e859ef044462a257e1fc76730066"
+ Content-Type = "application/json"
+ }
+ array_mode = true
+ batch_size = 50
+ request_interval_ms = 500
+}
+```
+
## 变更日志
<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
index 9e4fcadb6d..272f202329 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
@@ -42,6 +42,9 @@ public class HttpParameter implements Serializable {
protected boolean enableMultilines;
protected int connectTimeoutMs;
protected int socketTimeoutMs;
+ protected boolean arrayMode = false;
+ protected int batchSize = 1;
+ protected int requestIntervalMs = 0;
public void buildWithConfig(ReadonlyConfig pluginConfig) {
// set url
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java
index 58328a9e98..1b4677efc2 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java
@@ -17,4 +17,27 @@
package org.apache.seatunnel.connectors.seatunnel.http.config;
-public class HttpSinkOptions extends HttpCommonOptions {}
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class HttpSinkOptions extends HttpCommonOptions {
+ public static final Option<Boolean> ARRAY_MODE =
+ Options.key("array_mode")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Send data as a JSON array when true, or as a
single JSON object when false (default)");
+
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "The batch size of records to send in one HTTP
request. Only works when array_mode is true");
+
+ public static final Option<Integer> REQUEST_INTERVAL_MS =
+ Options.key("request_interval_ms")
+ .intType()
+ .defaultValue(0)
+ .withDescription("The interval milliseconds between two
HTTP requests");
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 7274a464bf..c19c2b18c7 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -47,6 +47,16 @@ public class HttpSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
if (pluginConfig.getOptional(HttpSinkOptions.PARAMS).isPresent()) {
httpParameter.setHeaders(pluginConfig.get(HttpSinkOptions.PARAMS));
}
+ if (pluginConfig.getOptional(HttpSinkOptions.ARRAY_MODE).isPresent()) {
+
httpParameter.setArrayMode(pluginConfig.get(HttpSinkOptions.ARRAY_MODE));
+ }
+ if (pluginConfig.getOptional(HttpSinkOptions.BATCH_SIZE).isPresent()) {
+
httpParameter.setBatchSize(pluginConfig.get(HttpSinkOptions.BATCH_SIZE));
+ }
+ if
(pluginConfig.getOptional(HttpSinkOptions.REQUEST_INTERVAL_MS).isPresent()) {
+ httpParameter.setRequestIntervalMs(
+ pluginConfig.get(HttpSinkOptions.REQUEST_INTERVAL_MS));
+ }
this.catalogTable = catalogTable;
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
index 0f057e9f04..b0a192893b 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
@@ -48,6 +48,9 @@ public class HttpSinkFactory implements TableSinkFactory {
.optional(HttpSinkOptions.RETRY)
.optional(HttpSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS)
.optional(HttpSinkOptions.RETRY_BACKOFF_MAX_MS)
+ .optional(HttpSinkOptions.ARRAY_MODE)
+ .optional(HttpSinkOptions.BATCH_SIZE)
+ .optional(HttpSinkOptions.REQUEST_INTERVAL_MS)
.optional(SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index 0333b8f37a..cde7eabe36 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.http.sink;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -30,7 +34,10 @@ import
org.apache.seatunnel.format.json.JsonSerializationSchema;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
+import java.util.Optional;
@Slf4j
public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
@@ -40,6 +47,13 @@ public class HttpSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
protected final HttpParameter httpParameter;
protected final SerializationSchema serializationSchema;
+ // Batch related fields
+ private final boolean arrayMode;
+ private final int batchSize;
+ private final int requestIntervalMs;
+ private final List<SeaTunnelRow> batchBuffer;
+ private long lastRequestTime;
+
public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter
httpParameter) {
this(seaTunnelRowType, httpParameter, new
JsonSerializationSchema(seaTunnelRowType));
}
@@ -48,18 +62,81 @@ public class HttpSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
SeaTunnelRowType seaTunnelRowType,
HttpParameter httpParameter,
SerializationSchema serializationSchema) {
+ this(
+ seaTunnelRowType,
+ httpParameter,
+ serializationSchema,
+ httpParameter.isArrayMode(),
+ httpParameter.getBatchSize(),
+ httpParameter.getRequestIntervalMs());
+ }
+
+ public HttpSinkWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ HttpParameter httpParameter,
+ SerializationSchema serializationSchema,
+ boolean arrayMode,
+ int batchSize,
+ int requestIntervalMs) {
this.seaTunnelRowType = seaTunnelRowType;
this.httpParameter = httpParameter;
- this.httpClient = new HttpClientProvider(httpParameter);
+ this.httpClient = createHttpClient(httpParameter);
this.serializationSchema = serializationSchema;
+ this.arrayMode = arrayMode;
+ this.batchSize = batchSize;
+ this.requestIntervalMs = requestIntervalMs;
+ this.batchBuffer = new ArrayList<>(batchSize);
+ this.lastRequestTime = System.currentTimeMillis();
}
@Override
public void write(SeaTunnelRow element) throws IOException {
+ if (!arrayMode) {
+ writeSingleRecord(element);
+ } else {
+ batchBuffer.add(element);
+ if (batchBuffer.size() >= batchSize) {
+ flush();
+ }
+ }
+ }
+
+ private void writeSingleRecord(SeaTunnelRow element) throws IOException {
byte[] serialize = serializationSchema.serialize(element);
String body = new String(serialize);
+ doHttpRequest(body);
+ }
+
+ private void flush() throws IOException {
+ if (batchBuffer.isEmpty()) {
+ return;
+ }
+ long currentTime = System.currentTimeMillis();
+ long timeSinceLastRequest = currentTime - lastRequestTime;
+ if (requestIntervalMs > 0 && timeSinceLastRequest < requestIntervalMs)
{
+ try {
+ Thread.sleep(requestIntervalMs - timeSinceLastRequest);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Array mode: serialize batch data as JSON
+ ObjectMapper mapper = new ObjectMapper();
+ ArrayNode arrayNode = mapper.createArrayNode();
+ for (SeaTunnelRow row : batchBuffer) {
+ byte[] serialize = serializationSchema.serialize(row);
+ arrayNode.add(new String(serialize));
+ }
+ String body = mapper.writeValueAsString(arrayNode);
+ doHttpRequest(body);
+
+ batchBuffer.clear();
+ lastRequestTime = System.currentTimeMillis();
+ }
+
+ private void doHttpRequest(String body) {
try {
- // only support post web hook
HttpResponse response =
httpClient.doPost(httpParameter.getUrl(),
httpParameter.getHeaders(), body);
if (HttpResponse.STATUS_OK == response.getCode()) {
@@ -76,8 +153,28 @@ public class HttpSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
@Override
public void close() throws IOException {
+ if (arrayMode) {
+ flush();
+ }
if (Objects.nonNull(httpClient)) {
httpClient.close();
}
}
+
+ @Override
+ public Optional<Void> prepareCommit() {
+ if (arrayMode) {
+ try {
+ flush();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to flush data in
prepareCommit", e);
+ }
+ }
+ return Optional.empty();
+ }
+
+ @VisibleForTesting
+ protected HttpClientProvider createHttpClient(HttpParameter httpParameter)
{
+ return new HttpClientProvider(httpParameter);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkBatchWriterTest.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkBatchWriterTest.java
new file mode 100644
index 0000000000..ee8769f595
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkBatchWriterTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.sink;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class HttpSinkBatchWriterTest {
+
+ private static final String TEST_URL = "http://example.com/test";
+ private static final int BATCH_SIZE = 3;
+ private static final int REQUEST_INTERVAL_MS = 0;
+
+ @Mock private HttpClientProvider httpClientProvider;
+
+ @Captor private ArgumentCaptor<String> requestBodyCaptor;
+
+ private HttpParameter httpParameter;
+ private SeaTunnelRowType rowType;
+ private TestableHttpSinkWriter sinkWriter;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ // Setting HTTP Parameters
+ httpParameter = new HttpParameter();
+ httpParameter.setUrl(TEST_URL);
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Content-Type", "application/json");
+ httpParameter.setHeaders(headers);
+
+ // Simulate HTTP response
+ HttpResponse mockResponse = Mockito.mock(HttpResponse.class);
+ when(mockResponse.getCode()).thenReturn(HttpResponse.STATUS_OK);
+ when(httpClientProvider.doPost(anyString(), any(),
anyString())).thenReturn(mockResponse);
+
+ // Creating Row Types
+ String[] fieldNames = new String[] {"id", "name", "age"};
+ SeaTunnelDataType<?>[] dataTypes =
+ new SeaTunnelDataType<?>[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.INT_TYPE
+ };
+ rowType = new SeaTunnelRowType(fieldNames, dataTypes);
+ }
+
+ @Test
+ public void testDefaultParameterValues() throws Exception {
+ // No parameters are set, use default values
+ // default:arrayMode = false, batchSize = 1, requestIntervalMs = 0
+ HttpParameter defaultHttpParameter = new HttpParameter();
+ defaultHttpParameter.setUrl(TEST_URL);
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Content-Type", "application/json");
+ defaultHttpParameter.setHeaders(headers);
+
+ // Verify the default parameter value
+ assertFalse(defaultHttpParameter.isArrayMode());
+ assertEquals(1, defaultHttpParameter.getBatchSize());
+ assertEquals(0, defaultHttpParameter.getRequestIntervalMs());
+
+ sinkWriter = new TestableHttpSinkWriter(rowType, defaultHttpParameter);
+
+ // Write 3 records
+ for (int i = 0; i < 3; i++) {
+ SeaTunnelRow row = createTestRow(i + 1, "user" + (i + 1), 20 + i);
+ sinkWriter.write(row);
+ }
+
+ // In the default object mode, there should be 3 HTTP requests, each
record is sent
+ // separately
+ verify(httpClientProvider, times(3))
+ .doPost(eq(TEST_URL), any(), requestBodyCaptor.capture());
+
+ // Verify request format (single object)
+ for (String requestBody : requestBodyCaptor.getAllValues()) {
+ assertTrue(requestBody.startsWith("{"));
+ assertTrue(requestBody.endsWith("}"));
+ }
+ }
+
+ @Test
+ public void testObjectModeIgnoresBatchSize() throws Exception {
+ // Use object mode (default) to ignore batch size
+ httpParameter.setArrayMode(false);
+ httpParameter.setBatchSize(BATCH_SIZE);
+ httpParameter.setRequestIntervalMs(REQUEST_INTERVAL_MS);
+ sinkWriter = new TestableHttpSinkWriter(rowType, httpParameter);
+
+ // Write 3 records (equal to batch size)
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ SeaTunnelRow row = createTestRow(i + 1, "user" + (i + 1), 20 + i);
+ sinkWriter.write(row);
+ }
+
+ // In object mode, there should be 3 HTTP requests, each record sent
separately
+ verify(httpClientProvider, times(3))
+ .doPost(eq(TEST_URL), any(), requestBodyCaptor.capture());
+
+ // Validation request format (single object)
+ for (String requestBody : requestBodyCaptor.getAllValues()) {
+ assertTrue(requestBody.startsWith("{"));
+ assertTrue(requestBody.endsWith("}"));
+ }
+ }
+
+ @Test
+ public void testArrayModeWithBatch() throws Exception {
+ // Use array mode to turn on batch processing
+ httpParameter.setArrayMode(true);
+ httpParameter.setBatchSize(BATCH_SIZE);
+ httpParameter.setRequestIntervalMs(REQUEST_INTERVAL_MS);
+ sinkWriter = new TestableHttpSinkWriter(rowType, httpParameter);
+
+ // Write 5 records (over batch size)
+ for (int i = 0; i < 5; i++) {
+ SeaTunnelRow row = createTestRow(i + 1, "user" + (i + 1), 20 + i);
+ sinkWriter.write(row);
+ }
+
+ // There should only be 1 HTTP request (the first batch of 3), the
remaining 2 have not yet
+ // met the batch size
+ verify(httpClientProvider, times(1))
+ .doPost(eq(TEST_URL), any(), requestBodyCaptor.capture());
+
+ // Validation request format (array)
+ String requestBody = requestBodyCaptor.getValue();
+ assertTrue(requestBody.startsWith("["));
+ assertTrue(requestBody.endsWith("]"));
+
+ // Close SinkWriter, should send another request (for the remaining 2
records)
+ sinkWriter.close();
+ verify(httpClientProvider, times(2))
+ .doPost(eq(TEST_URL), any(), requestBodyCaptor.capture());
+
+ // Validating the content of the second request
+ requestBody = requestBodyCaptor.getValue();
+ assertTrue(requestBody.startsWith("["));
+ assertTrue(requestBody.endsWith("]"));
+ }
+
+ private SeaTunnelRow createTestRow(int id, String name, int age) {
+ return new SeaTunnelRow(new Object[] {id, name, age});
+ }
+
+ private class TestableHttpSinkWriter extends HttpSinkWriter {
+ public TestableHttpSinkWriter(
+ SeaTunnelRowType seaTunnelRowType, HttpParameter
httpParameter) {
+ super(seaTunnelRowType, httpParameter);
+ }
+
+ @Override
+ protected HttpClientProvider createHttpClient(HttpParameter
httpParameter) {
+ return httpClientProvider;
+ }
+ }
+}