This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d6bd8c08 [INLONG-6797][Sort] Supports dirty data side-output for 
filesystem sink (#6799)
8d6bd8c08 is described below

commit 8d6bd8c0825c7f1f571419bb05c68f982965cd59
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Fri Dec 9 14:11:43 2022 +0800

    [INLONG-6797][Sort] Supports dirty data side-output for filesystem sink 
(#6799)
---
 .../sort/filesystem/FileSystemTableFactory.java    | 12 ++++-
 .../sort/filesystem/FileSystemTableSink.java       | 27 +++++++---
 .../filesystem/stream/AbstractStreamingWriter.java | 58 ++++++++++++++++++----
 .../filesystem/stream/StreamingFileWriter.java     |  9 +++-
 .../sort/filesystem/stream/StreamingSink.java      | 17 +++++--
 .../stream/compact/CompactFileWriter.java          | 10 +++-
 .../inlong/sort/parser/FlinkSqlParserTest.java     | 11 +++-
 7 files changed, 115 insertions(+), 29 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
index 58c585ebc..f27d55ec7 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
@@ -39,6 +39,9 @@ import 
org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.table.filesystem.FileSystemOptions;
 import org.apache.flink.table.filesystem.FileSystemTableSource;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -48,6 +51,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.time.ZoneId.SHORT_IDS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -86,13 +90,17 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
     public DynamicTableSink createDynamicTableSink(Context context) {
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         validate(helper);
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(helper.getOptions());
+        final DirtySink<Object> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
         return new FileSystemTableSink(
                 context,
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, 
DeserializationFormatFactory.class),
                 discoverFormatFactory(context),
                 discoverEncodingFormat(context, BulkWriterFormatFactory.class),
-                discoverEncodingFormat(context, 
SerializationFormatFactory.class));
+                discoverEncodingFormat(context, 
SerializationFormatFactory.class),
+                dirtyOptions,
+                dirtySink);
     }
 
     @Override
@@ -132,7 +140,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
     private void validate(FactoryUtil.TableFactoryHelper helper) {
         // Except format options, some formats like parquet and orc can not 
list all supported
         // options.
-        helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + 
".");
+        helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + 
".", DIRTY_PREFIX);
 
         // validate time zone of watermark
         String watermarkTimeZone =
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
index 32f35b0e1..6f344098a 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
@@ -77,6 +77,8 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.filesystem.stream.StreamingSink;
 
 import javax.annotation.Nullable;
@@ -129,10 +131,12 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
     private LinkedHashMap<String, String> staticPartitions = new 
LinkedHashMap<>();
 
     @Nullable
-    private Integer configuredParallelism;
+    private final Integer configuredParallelism;
 
-    private String inlongMetric;
-    private String inlongAudit;
+    private final String inlongMetric;
+    private final String inlongAudit;
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
 
     FileSystemTableSink(
             DynamicTableFactory.Context context,
@@ -140,7 +144,9 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
             @Nullable DecodingFormat<DeserializationSchema<RowData>> 
deserializationFormat,
             @Nullable FileSystemFormatFactory formatFactory,
             @Nullable EncodingFormat<BulkWriter.Factory<RowData>> 
bulkWriterFormat,
-            @Nullable EncodingFormat<SerializationSchema<RowData>> 
serializationFormat) {
+            @Nullable EncodingFormat<SerializationSchema<RowData>> 
serializationFormat,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         super(context);
         this.bulkReaderFormat = bulkReaderFormat;
         this.deserializationFormat = deserializationFormat;
@@ -159,6 +165,8 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
         this.configuredParallelism = 
tableOptions.get(FileSystemOptions.SINK_PARALLELISM);
         this.inlongMetric = tableOptions.get(INLONG_METRIC);
         this.inlongAudit = tableOptions.get(INLONG_AUDIT);
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     @Override
@@ -283,11 +291,14 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
                             compactionSize,
                             parallelism,
                             inlongMetric,
-                            inlongAudit);
+                            inlongAudit,
+                            dirtyOptions,
+                            dirtySink);
         } else {
             writerStream =
                     StreamingSink.writer(
-                            dataStream, bucketCheckInterval, bucketsBuilder, 
parallelism, inlongMetric, inlongAudit);
+                            dataStream, bucketCheckInterval, bucketsBuilder, 
parallelism,
+                            inlongMetric, inlongAudit, dirtyOptions, 
dirtySink);
         }
 
         return StreamingSink.sink(
@@ -543,7 +554,9 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
                         deserializationFormat,
                         formatFactory,
                         bulkWriterFormat,
-                        serializationFormat);
+                        serializationFormat,
+                        dirtyOptions,
+                        dirtySink);
         sink.overwrite = overwrite;
         sink.dynamicGrouping = dynamicGrouping;
         sink.staticPartitions = staticPartitions;
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index e5e74e62a..cc85113b8 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -36,12 +37,20 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
@@ -61,6 +70,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractStreamingWriter.class);
+
     // ------------------------ configuration fields --------------------------
 
     private final long bucketCheckInterval;
@@ -69,6 +80,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
 
     private final String inlongMetric;
     private final String inlongAudit;
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
 
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
@@ -88,11 +101,15 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
     public AbstractStreamingWriter(
             long bucketCheckInterval,
             StreamingFileSink.BucketsBuilder<IN, String, ? extends 
StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder,
-            String inlongMetric, String inlongAudit) {
+            String inlongMetric, String inlongAudit,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         this.bucketCheckInterval = bucketCheckInterval;
         this.bucketsBuilder = bucketsBuilder;
         this.inlongMetric = inlongMetric;
         this.inlongAudit = inlongAudit;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
@@ -130,6 +147,9 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
         if (metricOption != null) {
             sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
         }
+        if (dirtySink != null) {
+            dirtySink.open(new Configuration());
+        }
     }
 
     /**
@@ -141,14 +161,11 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
             if (sinkMetricData != null) {
                 sinkMetricData.invoke(rowSize, dataSize);
             }
-        } catch (Exception e) {
-            if (sinkMetricData != null) {
-                sinkMetricData.invokeDirty(rowSize, dataSize);
-            }
-            LOG.error("fileSystem sink commitUpToCheckpoint.", e);
-        } finally {
             rowSize = 0L;
             dataSize = 0L;
+        } catch (Exception e) {
+            LOG.error("fileSystem sink commitUpToCheckpoint.", e);
+            throw e;
         }
     }
 
@@ -221,11 +238,34 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
                     currentWatermark);
             rowSize = rowSize + 1;
             dataSize = dataSize + 
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         } catch (Exception e) {
+            LOG.error("StreamingWriter write failed", e);
+            if (!dirtyOptions.ignoreDirty()) {
+                throw new RuntimeException(e);
+            }
             if (sinkMetricData != null) {
-                sinkMetricData.invokeDirty(1L, 
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length);
+                sinkMetricData.invokeDirty(1L,
+                        
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length);
+            }
+            if (dirtySink != null) {
+                DirtyData.Builder<Object> builder = DirtyData.builder();
+                try {
+                    builder.setData(element.getValue())
+                            .setDirtyType(DirtyType.UNDEFINED)
+                            .setLabels(dirtyOptions.getLabels())
+                            .setLogTag(dirtyOptions.getLogTag())
+                            .setDirtyMessage(e.getMessage())
+                            .setIdentifier(dirtyOptions.getIdentifier());
+                    dirtySink.invoke(builder.build());
+                } catch (Exception ex) {
+                    if (!dirtyOptions.ignoreSideOutputErrors()) {
+                        throw new RuntimeException(ex);
+                    }
+                    LOGGER.warn("Dirty sink failed", ex);
+                }
             }
-            LOG.error("fileSystem sink processElement.", e);
         }
     }
 
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
index 1fb2e3742..97168a43c 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
@@ -24,7 +24,10 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.NavigableMap;
@@ -45,8 +48,10 @@ public class StreamingFileWriter<IN> extends 
AbstractStreamingWriter<IN, Partiti
     public StreamingFileWriter(
             long bucketCheckInterval,
             StreamingFileSink.BucketsBuilder<IN, String, ? extends 
StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder,
-            String inlongMetric, String inlongAudit) {
-        super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit);
+            String inlongMetric, String inlongAudit,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
+        super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit, 
dirtyOptions, dirtySink);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
index 5408e34be..2a26ed4cd 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
@@ -42,8 +42,11 @@ import 
org.apache.flink.table.filesystem.stream.compact.CompactOperator;
 import org.apache.flink.table.filesystem.stream.compact.CompactReader;
 import org.apache.flink.table.filesystem.stream.compact.CompactWriter;
 import org.apache.flink.util.function.SupplierWithException;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.filesystem.stream.compact.CompactFileWriter;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
@@ -66,9 +69,12 @@ public class StreamingSink {
             DataStream<T> inputStream,
             long bucketCheckInterval,
             StreamingFileSink.BucketsBuilder<T, String, ? extends 
StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder,
-            int parallelism, String inlongMetric, String inlongAudit) {
+            int parallelism, String inlongMetric, String inlongAudit,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         StreamingFileWriter<T> fileWriter =
-                new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, 
inlongMetric, inlongAudit);
+                new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder,
+                        inlongMetric, inlongAudit, dirtyOptions, dirtySink);
         return inputStream
                 .transform(
                         StreamingFileWriter.class.getSimpleName(),
@@ -89,9 +95,11 @@ public class StreamingSink {
             Path path,
             CompactReader.Factory<T> readFactory,
             long targetFileSize,
-            int parallelism, String inlongMetric, String inlongAudit) {
+            int parallelism, String inlongMetric, String inlongAudit,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         CompactFileWriter<T> writer = new 
CompactFileWriter<>(bucketCheckInterval, bucketsBuilder, inlongMetric,
-                inlongAudit);
+                inlongAudit, dirtyOptions, dirtySink);
 
         SupplierWithException<FileSystem, IOException> fsSupplier =
                 (SupplierWithException<FileSystem, IOException> & 
Serializable) () -> fsFactory.create(path.toUri());
@@ -118,7 +126,6 @@ public class StreamingSink {
 
         CompactOperator<T> compacter =
                 new CompactOperator<>(fsSupplier, readFactory, writerFactory);
-
         return coordinatorOp
                 .broadcast()
                 .transform(
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
index 731aca18f..385894bc5 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
@@ -24,8 +24,12 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput;
 import 
org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint;
 import 
org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.filesystem.stream.AbstractStreamingWriter;
 
+import javax.annotation.Nullable;
+
 /**
  * Writer for emitting {@link InputFile} and {@link EndCheckpoint} to 
downstream.
  */
@@ -38,8 +42,10 @@ public class CompactFileWriter<T>
     public CompactFileWriter(
             long bucketCheckInterval,
             StreamingFileSink.BucketsBuilder<T, String, ? extends 
StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder,
-            String inlongMetric, String inlongAudit) {
-        super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit);
+            String inlongMetric, String inlongAudit,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
+        super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit, 
dirtyOptions, dirtySink);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
index c4c71791b..f1010dfd5 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
@@ -162,9 +162,16 @@ public class FlinkSqlParserTest extends AbstractTestBase {
                                 new FieldInfo("age", new IntFormatInfo())),
                         new FieldRelation(new FieldInfo("ts", new 
TimestampFormatInfo()),
                                 new FieldInfo("ts", new 
TimestampFormatInfo())));
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student");
         return new FileSystemLoadNode(id, "hdfs_output", fields, relations,
-                null, "hdfs://localhost:9000/file", "json",
-                1, null, null, null);
+                null, "hdfs://localhost:9000/inlong/student", "json",
+                1, properties, null, null);
     }
 
     /**

Reply via email to