This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 21bfeb155 [INLONG-6654][Sort] Supports s3 side-output for dirty data 
(#6655)
21bfeb155 is described below

commit 21bfeb1555f17a49c40a771f3fc52e44c048a20d
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Tue Nov 29 15:46:47 2022 +0800

    [INLONG-6654][Sort] Supports s3 side-output for dirty data (#6655)
---
 inlong-sort/sort-connectors/base/pom.xml           |   5 +
 .../sort/base/dirty/sink/DirtySinkFactory.java     |   7 +-
 .../sort/base/dirty/sink/log/LogDirtySink.java     |   4 +-
 .../base/dirty/sink/log/LogDirtySinkFactory.java   |  10 +-
 .../sort/base/dirty/sink/s3/S3DirtySink.java       | 279 +++++++++++++++++++++
 .../base/dirty/sink/s3/S3DirtySinkFactory.java     | 148 +++++++++++
 .../inlong/sort/base/dirty/sink/s3/S3Helper.java   | 100 ++++++++
 .../inlong/sort/base/dirty/sink/s3/S3Options.java  | 241 ++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |   3 +-
 licenses/inlong-sort-connectors/LICENSE            |   2 +-
 pom.xml                                            |   7 +
 11 files changed, 795 insertions(+), 11 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/pom.xml 
b/inlong-sort/sort-connectors/base/pom.xml
index 8e4d2701e..675190c51 100644
--- a/inlong-sort/sort-connectors/base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -42,6 +42,11 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-s3</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
index b6725ddd8..07784b443 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
@@ -17,12 +17,13 @@
 
 package org.apache.inlong.sort.base.dirty.sink;
 
-import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.Factory;
 
 /**
  * Dirty sink factory class, it is used to create dirty sink
  */
-public interface DirtySinkFactory extends DynamicTableFactory {
+public interface DirtySinkFactory extends Factory {
 
     /**
      * Create dirty sink
@@ -31,6 +32,6 @@ public interface DirtySinkFactory extends DynamicTableFactory 
{
      * @param <T> The data mode that is handled by the dirty sink
      * @return A dirty sink
      */
-    <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context context);
+    <T> DirtySink<T> createDirtySink(Context context);
 
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
index bf5a4f135..a57c981fe 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -46,7 +46,7 @@ public class LogDirtySink<T> implements DirtySink<T> {
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(LogDirtySink.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogDirtySink.class);
 
     private final RowData.FieldGetter[] fieldGetters;
     private final String format;
@@ -85,7 +85,7 @@ public class LogDirtySink<T> implements DirtySink<T> {
             // Only support csv format when the row is not a 'RowData' and 
'JsonNode'
             value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter);
         }
-        LOG.info("[{}] {}", dirtyData.getLogTag(), value);
+        LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
     }
 
     private String format(RowData data, Map<String, String> labels) throws 
JsonProcessingException {
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
index 93a12f584..c3720a93d 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
@@ -18,12 +18,14 @@
 package org.apache.inlong.sort.base.dirty.sink.log;
 
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
 
 import java.util.HashSet;
 import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
 import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
 import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
 
@@ -36,10 +38,9 @@ public class LogDirtySinkFactory implements DirtySinkFactory 
{
 
     @Override
     public <T> DirtySink<T> createDirtySink(Context context) {
-        final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
-        FactoryUtil.validateFactoryOptions(this, helper.getOptions());
-        String format = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FORMAT);
-        String fieldDelimiter = 
helper.getOptions().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+        FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
+        String format = 
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FORMAT);
+        String fieldDelimiter = 
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
         return new LogDirtySink<>(format, fieldDelimiter,
                 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
     }
@@ -59,6 +60,7 @@ public class LogDirtySinkFactory implements DirtySinkFactory {
         final Set<ConfigOption<?>> options = new HashSet<>();
         options.add(DIRTY_SIDE_OUTPUT_FORMAT);
         options.add(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+        options.add(DIRTY_IDENTIFIER);
         return options;
     }
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
new file mode 100644
index 000000000..3cb20c7b8
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import 
org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(S3DirtySink.class);
+
+    private final Map<String, List<String>> batchMap = new HashMap<>();
+    private final S3Options s3Options;
+    private final AtomicLong readInNum = new AtomicLong(0);
+    private final AtomicLong writeOutNum = new AtomicLong(0);
+    private final AtomicLong errorNum = new AtomicLong(0);
+    private final DataType physicalRowDataType;
+    private final RowData.FieldGetter[] fieldGetters;
+    private RowDataToJsonConverter converter;
+    private long batchBytes = 0L;
+    private int size;
+    private transient volatile boolean closed = false;
+    private transient volatile boolean flushing = false;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient S3Helper s3Helper;
+
+    public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
+        this.s3Options = s3Options;
+        this.physicalRowDataType = physicalRowDataType;
+        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+                
.stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+        }
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        converter = new RowDataToJsonConverters(TimestampFormat.SQL, 
MapNullKeyMode.DROP, null)
+                .createConverter(physicalRowDataType.getLogicalType());
+        AmazonS3 s3Client;
+        if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId() 
!= null) {
+            BasicAWSCredentials awsCreds =
+                    new BasicAWSCredentials(s3Options.getAccessKeyId(), 
s3Options.getSecretKeyId());
+            s3Client = AmazonS3ClientBuilder.standard().withCredentials(new 
AWSStaticCredentialsProvider(awsCreds))
+                    .withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
+                            s3Options.getEndpoint(),
+                            s3Options.getRegion()))
+                    .build();
+        } else {
+            s3Client = 
AmazonS3ClientBuilder.standard().withEndpointConfiguration(
+                    new 
AwsClientBuilder.EndpointConfiguration(s3Options.getEndpoint(), 
s3Options.getRegion())).build();
+        }
+        s3Helper = new S3Helper(s3Client, s3Options);
+        this.scheduler = new ScheduledThreadPoolExecutor(1,
+                new ExecutorThreadFactory("s3-dirty-sink"));
+        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+            if (!closed && !flushing) {
+                flush();
+            }
+        }, s3Options.getBatchIntervalMs(), s3Options.getBatchIntervalMs(), 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public synchronized void invoke(DirtyData<T> dirtyData) throws Exception {
+        try {
+            addBatch(dirtyData);
+        } catch (Exception e) {
+            if (!s3Options.ignoreSideOutputErrors()) {
+                throw new RuntimeException(String.format("Add batch to 
identifier:%s failed, the dirty data: %s.",
+                        dirtyData.getIdentifier(), dirtyData.toString()), e);
+            }
+            LOGGER.warn("Add batch to identifier:{} failed "
+                    + "and the dirty data will be throw away in the future"
+                    + " because the option 'dirty.side-output.ignore-errors' 
is 'true'", dirtyData.getIdentifier());
+        }
+        if (valid() && !flushing) {
+            flush();
+        }
+    }
+
+    private boolean valid() {
+        return (s3Options.getBatchSize() > 0 && size >= 
s3Options.getBatchSize())
+                || batchBytes >= s3Options.getMaxBatchBytes();
+    }
+
+    private void addBatch(DirtyData<T> dirtyData) throws IOException {
+        readInNum.incrementAndGet();
+        String value;
+        Map<String, String> labelMap = 
LabelUtils.parseLabels(dirtyData.getLabels());
+        T data = dirtyData.getData();
+        if (data instanceof RowData) {
+            value = format((RowData) data, labelMap);
+        } else if (data instanceof JsonNode) {
+            value = format((JsonNode) data, labelMap);
+        } else {
+            // Only support csv format when the row is not a 'RowData' and 
'JsonNode'
+            value = FormatUtils.csvFormat(data, labelMap, 
s3Options.getFieldDelimiter());
+        }
+        if (s3Options.enableDirtyLog()) {
+            LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
+        }
+        batchBytes += value.getBytes(UTF_8).length;
+        size++;
+        batchMap.computeIfAbsent(dirtyData.getIdentifier(), k -> new 
ArrayList<>()).add(value);
+    }
+
+    private String format(RowData data, Map<String, String> labels) throws 
JsonProcessingException {
+        String value;
+        switch (s3Options.getFormat()) {
+            case "csv":
+                value = FormatUtils.csvFormat(data, fieldGetters, labels, 
s3Options.getFieldDelimiter());
+                break;
+            case "json":
+                value = FormatUtils.jsonFormat(data, converter, labels);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported format for: %s", 
s3Options.getFormat()));
+        }
+        return value;
+    }
+
+    private String format(JsonNode data, Map<String, String> labels) throws 
JsonProcessingException {
+        String value;
+        switch (s3Options.getFormat()) {
+            case "csv":
+                value = FormatUtils.csvFormat(data, labels, 
s3Options.getFieldDelimiter());
+                break;
+            case "json":
+                value = FormatUtils.jsonFormat(data, labels);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported format for: %s", 
s3Options.getFormat()));
+        }
+        return value;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        if (!closed) {
+            closed = true;
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+            try {
+                flush();
+            } catch (Exception e) {
+                LOGGER.warn("Writing records to s3 failed.", e);
+                throw new RuntimeException("Writing records to s3 failed.", e);
+            }
+        }
+    }
+
+    /**
+     * Flush data to s3
+     */
+    public synchronized void flush() {
+        flushing = true;
+        if (!hasRecords()) {
+            flushing = false;
+            return;
+        }
+        for (Entry<String, List<String>> kvs : batchMap.entrySet()) {
+            flushSingleIdentifier(kvs.getKey(), kvs.getValue());
+        }
+        batchMap.clear();
+        batchBytes = 0;
+        size = 0;
+        flushing = false;
+        LOGGER.info("S3 dirty sink statistics: readInNum: {}, writeOutNum: {}, 
errorNum: {}",
+                readInNum.get(), writeOutNum.get(), errorNum.get());
+    }
+
+    /**
+     * Flush data of single identifier to s3
+     *
+     * @param identifier The identifier of dirty data
+     * @param values The values of the identifier
+     */
+    private void flushSingleIdentifier(String identifier, List<String> values) 
{
+        if (values == null || values.isEmpty()) {
+            return;
+        }
+        String content = null;
+        try {
+            content = StringUtils.join(values, s3Options.getLineDelimiter());
+            s3Helper.upload(identifier, content);
+            LOGGER.info("Write {} records to s3 of identifier: {}", 
values.size(), identifier);
+            writeOutNum.addAndGet(values.size());
+            // Clean the data that has been loaded.
+            values.clear();
+        } catch (Exception e) {
+            errorNum.addAndGet(values.size());
+            if (!s3Options.ignoreSideOutputErrors()) {
+                throw new RuntimeException(
+                        String.format("Writing records to s3 of identifier:%s 
failed, the value: %s.",
+                                identifier, content),
+                        e);
+            }
+            LOGGER.warn("Writing records to s3 of identifier:{} failed "
+                    + "and the dirty data will be throw away in the future"
+                    + " because the option 'dirty.side-output.ignore-errors' 
is 'true'", identifier);
+        }
+    }
+
+    private boolean hasRecords() {
+        if (batchMap.isEmpty()) {
+            return false;
+        }
+        for (List<String> value : batchMap.values()) {
+            if (!value.isEmpty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
new file mode 100644
index 000000000..d9ec26434
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_BYTES;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_INTERVAL;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_SIZE;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LINE_DELIMITER;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;
+
+/**
+ * S3 dirty sink factory
+ */
+public class S3DirtySinkFactory implements DirtySinkFactory {
+
+    private static final String IDENTIFIER = "s3";
+
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_ENDPOINT =
+            ConfigOptions.key("dirty.side-output.s3.endpoint")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The endpoint of s3");
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_REGION =
+            ConfigOptions.key("dirty.side-output.s3.region")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The region of s3");
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_BUCKET =
+            ConfigOptions.key("dirty.side-output.s3.bucket")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The bucket of s3");
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_KEY =
+            ConfigOptions.key("dirty.side-output.s3.key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The key of s3");
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_ACCESS_KEY_ID =
+            ConfigOptions.key("dirty.side-output.s3.access-key-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The access key of s3");
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_SECRET_KEY_ID =
+            ConfigOptions.key("dirty.side-output.s3.secret-key-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The secret key of s3");
+
+    @Override
+    public <T> DirtySink<T> createDirtySink(Context context) {
+        FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
+        validate(context.getConfiguration());
+        return new S3DirtySink<>(getS3Options(context.getConfiguration()),
+                
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+    }
+
+    private void validate(ReadableConfig config) {
+        String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null);
+        if (identifier == null || identifier.trim().length() == 0) {
+            throw new ValidationException(
+                    "The option 'dirty.identifier' is not allowed to be 
empty.");
+        }
+    }
+
+    private S3Options getS3Options(ReadableConfig config) {
+        final S3Options.Builder builder = S3Options.builder()
+                
.setEndpoint(config.getOptional(DIRTY_SIDE_OUTPUT_ENDPOINT).orElse(null))
+                
.setRegion(config.getOptional(DIRTY_SIDE_OUTPUT_REGION).orElse(null))
+                
.setBucket(config.getOptional(DIRTY_SIDE_OUTPUT_BUCKET).orElse(null))
+                .setKey(config.getOptional(DIRTY_SIDE_OUTPUT_KEY).orElse(null))
+                .setBatchSize(config.get(DIRTY_SIDE_OUTPUT_BATCH_SIZE))
+                .setMaxRetries(config.get(DIRTY_SIDE_OUTPUT_RETRIES))
+                
.setBatchIntervalMs(config.get(DIRTY_SIDE_OUTPUT_BATCH_INTERVAL))
+                .setMaxBatchBytes(config.get(DIRTY_SIDE_OUTPUT_BATCH_BYTES))
+                .setFormat(config.get(DIRTY_SIDE_OUTPUT_FORMAT))
+                
.setIgnoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS))
+                .setEnableDirtyLog(config.get(DIRTY_SIDE_OUTPUT_LOG_ENABLE))
+                
.setFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
+                .setLineDelimiter(config.get(DIRTY_SIDE_OUTPUT_LINE_DELIMITER))
+                
.setAccessKeyId(config.getOptional(DIRTY_SIDE_OUTPUT_ACCESS_KEY_ID).orElse(null))
+                
.setSecretKeyId(config.getOptional(DIRTY_SIDE_OUTPUT_SECRET_KEY_ID).orElse(null));
+        return builder.build();
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DIRTY_SIDE_OUTPUT_ENDPOINT);
+        options.add(DIRTY_SIDE_OUTPUT_REGION);
+        options.add(DIRTY_SIDE_OUTPUT_BUCKET);
+        options.add(DIRTY_SIDE_OUTPUT_KEY);
+        options.add(DIRTY_IDENTIFIER);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DIRTY_SIDE_OUTPUT_BATCH_SIZE);
+        options.add(DIRTY_SIDE_OUTPUT_RETRIES);
+        options.add(DIRTY_SIDE_OUTPUT_BATCH_INTERVAL);
+        options.add(DIRTY_SIDE_OUTPUT_BATCH_BYTES);
+        options.add(DIRTY_SIDE_OUTPUT_FORMAT);
+        options.add(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
+        options.add(DIRTY_SIDE_OUTPUT_LOG_ENABLE);
+        options.add(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+        options.add(DIRTY_SIDE_OUTPUT_LINE_DELIMITER);
+        options.add(DIRTY_SIDE_OUTPUT_ACCESS_KEY_ID);
+        options.add(DIRTY_SIDE_OUTPUT_SECRET_KEY_ID);
+        return options;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
new file mode 100644
index 000000000..d79b8aecd
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Random;
+
+/**
+ * S3 helper class, it helps write to s3
+ */
+public class S3Helper implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(S3DirtySink.class);
+
+    private static final DateTimeFormatter DATE_TIME_FORMAT = 
DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
+
+    private static final int SEQUENCE_LENGTH = 4;
+    private static final String ESCAPE_PATTERN = "[\\pP\\p{Punct}\\s]";
+    private static final String FILE_NAME_SUFFIX = ".txt";
+    private final Random r = new Random();
+    private final AmazonS3 s3Client;
+    private final S3Options s3Options;
+
+    S3Helper(AmazonS3 s3Client, S3Options s3Options) {
+        this.s3Client = s3Client;
+        this.s3Options = s3Options;
+    }
+
+    /**
+     * Upload data to s3
+     *
+     * @param identifier The identifier of dirty data
+     * @param content The content that will be upload
+     * @throws IOException The exception may be thrown when executing
+     */
+    public void upload(String identifier, String content) throws IOException {
+        String path = genFileName(identifier);
+        for (int i = 0; i < s3Options.getMaxRetries(); i++) {
+            try {
+                s3Client.putObject(s3Options.getBucket(), path, content);
+                break;
+            } catch (Exception e) {
+                LOG.error("s3 dirty sink error, retry times = {}", i, e);
+                if (i >= s3Options.getMaxRetries()) {
+                    throw new IOException(e);
+                }
+                try {
+                    Thread.sleep(1000L * i);
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException("unable to flush; interrupted while 
doing another attempt", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Generate the file name for s3
+     *
+     * @param identifier The identifier of dirty data
+     * @return File name of s3
+     */
+    private String genFileName(String identifier) {
+        return String.format("%s/%s-%s%s", s3Options.getKey(),
+                identifier.replaceAll(ESCAPE_PATTERN, ""), generateSequence(), 
FILE_NAME_SUFFIX);
+    }
+
+    private String generateSequence() {
+        StringBuilder sb = new 
StringBuilder(DATE_TIME_FORMAT.format(LocalDateTime.now()));
+        for (int i = 0; i < SEQUENCE_LENGTH; i++) {
+            sb.append(r.nextInt(10));
+        }
+        return sb.toString();
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Options.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Options.java
new file mode 100644
index 000000000..f507b4802
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Options.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * S3 options
+ */
+public class S3Options implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int DEFAULT_BATCH_SIZE = 100;
+    private static final int DEFAULT_MAX_RETRY_TIMES = 3;
+    private static final long DEFAULT_MAX_BATCH_BYTES = 1024 * 10L;
+    private static final long DEFAULT_INTERVAL_MILLIS = 10000L;
+    private static final String DEFAULT_FIELD_DELIMITER = ",";
+    private static final String DEFAULT_LINE_DELIMITER = "\n";
+    private static final String DEFAULT_FORMAT = "csv";
+
+    private final Integer batchSize;
+    private final Integer maxRetries;
+    private final Long batchIntervalMs;
+    private final Long maxBatchBytes;
+    private final boolean ignoreSideOutputErrors;
+    private final boolean enableDirtyLog;
+    private final String format;
+    private final String fieldDelimiter;
+    private final String lineDelimiter;
+    private final String endpoint;
+    private final String region;
+    private final String bucket;
+    private final String key;
+    private final String accessKeyId;
+    private final String secretKeyId;
+
+    private S3Options(Integer batchSize, Integer maxRetries, Long 
batchIntervalMs, Long maxBatchBytes,
+            String format, boolean ignoreSideOutputErrors, boolean 
enableDirtyLog, String fieldDelimiter,
+            String lineDelimiter, String endpoint, String region, String 
bucket, String key,
+            String accessKeyId, String secretKeyId) {
+        Preconditions.checkArgument(maxRetries >= 0);
+        Preconditions.checkArgument(maxBatchBytes >= 0);
+        this.batchSize = batchSize;
+        this.maxRetries = maxRetries;
+        this.batchIntervalMs = batchIntervalMs;
+        this.maxBatchBytes = maxBatchBytes;
+        this.format = format;
+        this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+        this.enableDirtyLog = enableDirtyLog;
+        this.fieldDelimiter = fieldDelimiter;
+        this.lineDelimiter = lineDelimiter;
+        this.endpoint = Preconditions.checkNotNull(endpoint, "endpoint is 
null");
+        this.region = Preconditions.checkNotNull(region, "region is null");
+        this.bucket = Preconditions.checkNotNull(bucket, "bucket is null");
+        this.key = Preconditions.checkNotNull(key, "key is null");
+        this.accessKeyId = accessKeyId;
+        this.secretKeyId = secretKeyId;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public Integer getBatchSize() {
+        return batchSize;
+    }
+
+    public Integer getMaxRetries() {
+        return maxRetries;
+    }
+
+    public Long getBatchIntervalMs() {
+        return batchIntervalMs;
+    }
+
+    public Long getMaxBatchBytes() {
+        return maxBatchBytes;
+    }
+
+    public String getFormat() {
+        return format;
+    }
+
+    public boolean ignoreSideOutputErrors() {
+        return ignoreSideOutputErrors;
+    }
+
+    public boolean enableDirtyLog() {
+        return enableDirtyLog;
+    }
+
+    public String getFieldDelimiter() {
+        return fieldDelimiter;
+    }
+
+    public String getLineDelimiter() {
+        return lineDelimiter;
+    }
+
+    public String getEndpoint() {
+        return endpoint;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public String getBucket() {
+        return bucket;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public String getAccessKeyId() {
+        return accessKeyId;
+    }
+
+    public String getSecretKeyId() {
+        return secretKeyId;
+    }
+
+    public static class Builder {
+
+        private Integer batchSize = DEFAULT_BATCH_SIZE;
+        private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES;
+        private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
+        private Long maxBatchBytes = DEFAULT_MAX_BATCH_BYTES;
+        private String format = DEFAULT_FORMAT;
+        private boolean ignoreSideOutputErrors;
+        private boolean enableDirtyLog;
+        private String fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+        private String lineDelimiter = DEFAULT_LINE_DELIMITER;
+        private String endpoint;
+        private String region;
+        private String bucket;
+        private String key;
+        private String accessKeyId;
+        private String secretKeyId;
+
+        public Builder setBatchSize(Integer batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        public Builder setMaxRetries(Integer maxRetries) {
+            this.maxRetries = maxRetries;
+            return this;
+        }
+
+        public Builder setBatchIntervalMs(Long batchIntervalMs) {
+            this.batchIntervalMs = batchIntervalMs;
+            return this;
+        }
+
+        public Builder setMaxBatchBytes(Long maxBatchBytes) {
+            this.maxBatchBytes = maxBatchBytes;
+            return this;
+        }
+
+        public Builder setFormat(String format) {
+            this.format = format;
+            return this;
+        }
+
+        public Builder setIgnoreSideOutputErrors(boolean 
ignoreSideOutputErrors) {
+            this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+            return this;
+        }
+
+        public Builder setEnableDirtyLog(boolean enableDirtyLog) {
+            this.enableDirtyLog = enableDirtyLog;
+            return this;
+        }
+
+        public Builder setFieldDelimiter(String fieldDelimiter) {
+            this.fieldDelimiter = fieldDelimiter;
+            return this;
+        }
+
+        public Builder setLineDelimiter(String lineDelimiter) {
+            this.lineDelimiter = lineDelimiter;
+            return this;
+        }
+
+        public Builder setEndpoint(String endpoint) {
+            this.endpoint = endpoint;
+            return this;
+        }
+
+        public Builder setRegion(String region) {
+            this.region = region;
+            return this;
+        }
+
+        public Builder setBucket(String bucket) {
+            this.bucket = bucket;
+            return this;
+        }
+
+        public Builder setKey(String key) {
+            this.key = key;
+            return this;
+        }
+
+        public Builder setAccessKeyId(String accessKeyId) {
+            this.accessKeyId = accessKeyId;
+            return this;
+        }
+
+        public Builder setSecretKeyId(String secretKeyId) {
+            this.secretKeyId = secretKeyId;
+            return this;
+        }
+
+        public S3Options build() {
+            return new S3Options(batchSize, maxRetries, batchIntervalMs, 
maxBatchBytes, format,
+                    ignoreSideOutputErrors, enableDirtyLog, fieldDelimiter, 
lineDelimiter, endpoint,
+                    region, bucket, key, accessKeyId, secretKeyId);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 412dedf67..a83e4d025 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
\ No newline at end of file
+org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
+org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 95793e251..5a0d591b5 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -844,7 +844,7 @@ The text of each license is the standard Apache 2.0 license.
   com.tencentcloudapi:tencentcloud-sdk-java:3.1.545 - tencentcloud-sdk-java 
(https://github.com/TencentCloud/tencentcloud-sdk-java), (The Apache Software 
License, Version 2.0)
   com.qcloud:dlc-data-catalog-metastore-client:1.1.1 - 
dlc-data-catalog-metastore-client 
(https://mvnrepository.com/artifact/com.qcloud/dlc-data-catalog-metastore-client/1.1),
 (The Apache Software License, Version 2.0)
   org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 - Flink Connector for 
Apache Doris 
(https://github.com/apache/doris-flink-connector/tree/1.13_2.11-1.0.3), (The 
Apache Software License, Version 2.0)
-
+  com.amazonaws:aws-java-sdk-s3:jar:1.12.346 - AWS Java SDK for Amazon S3 
(https://aws.amazon.com/sdkforjava), (The Apache Software License, Version 2.0)
 
 ========================================================================
 Apache 2.0 licenses
diff --git a/pom.xml b/pom.xml
index 6ecfe05a2..a3897cc04 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,7 @@
         <ini4j.version>0.5.4</ini4j.version>
         <dom4j.version>2.1.3</dom4j.version>
 
+        <aws.sdk.version>1.12.346</aws.sdk.version>
         <zookeeper.version>3.6.3</zookeeper.version>
         <pulsar.version>2.8.1</pulsar.version>
         <pulsar.testcontainers.version>1.15.3</pulsar.testcontainers.version>
@@ -1073,6 +1074,12 @@
                 <version>${flink.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.amazonaws</groupId>
+                <artifactId>aws-java-sdk-s3</artifactId>
+                <version>${aws.sdk.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>io.streamnative.connectors</groupId>
                 
<artifactId>pulsar-flink-connector_${scala.binary.version}</artifactId>


Reply via email to