This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8d6bd8c08 [INLONG-6797][Sort] Supports dirty data side-output for filesystem sink (#6799) 8d6bd8c08 is described below commit 8d6bd8c0825c7f1f571419bb05c68f982965cd59 Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Fri Dec 9 14:11:43 2022 +0800 [INLONG-6797][Sort] Supports dirty data side-output for filesystem sink (#6799) --- .../sort/filesystem/FileSystemTableFactory.java | 12 ++++- .../sort/filesystem/FileSystemTableSink.java | 27 +++++++--- .../filesystem/stream/AbstractStreamingWriter.java | 58 ++++++++++++++++++---- .../filesystem/stream/StreamingFileWriter.java | 9 +++- .../sort/filesystem/stream/StreamingSink.java | 17 +++++-- .../stream/compact/CompactFileWriter.java | 10 +++- .../inlong/sort/parser/FlinkSqlParserTest.java | 11 +++- 7 files changed, 115 insertions(+), 29 deletions(-) diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java index 58c585ebc..f27d55ec7 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java @@ -39,6 +39,9 @@ import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.table.filesystem.FileSystemOptions; import org.apache.flink.table.filesystem.FileSystemTableSource; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; import java.util.HashSet; import java.util.LinkedList; @@ -48,6 +51,7 @@ import java.util.Set; import java.util.stream.Collectors; import static java.time.ZoneId.SHORT_IDS; +import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX; import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; @@ -86,13 +90,17 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami public DynamicTableSink createDynamicTableSink(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); validate(helper); + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(helper.getOptions()); + final DirtySink<Object> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); return new FileSystemTableSink( context, discoverDecodingFormat(context, BulkReaderFormatFactory.class), discoverDecodingFormat(context, DeserializationFormatFactory.class), discoverFormatFactory(context), discoverEncodingFormat(context, BulkWriterFormatFactory.class), - discoverEncodingFormat(context, SerializationFormatFactory.class)); + discoverEncodingFormat(context, SerializationFormatFactory.class), + dirtyOptions, + dirtySink); } @Override @@ -132,7 +140,7 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami private void validate(FactoryUtil.TableFactoryHelper helper) { // Except format options, some formats like parquet and orc can not list all supported // options. - helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + "."); + helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + ".", DIRTY_PREFIX); // validate time zone of watermark String watermarkTimeZone = diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java index 32f35b0e1..6f344098a 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java @@ -77,6 +77,8 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.filesystem.stream.StreamingSink; import javax.annotation.Nullable; @@ -129,10 +131,12 @@ public class FileSystemTableSink extends AbstractFileSystemTable private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<>(); @Nullable - private Integer configuredParallelism; + private final Integer configuredParallelism; - private String inlongMetric; - private String inlongAudit; + private final String inlongMetric; + private final String inlongAudit; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink<Object> dirtySink; FileSystemTableSink( DynamicTableFactory.Context context, @@ -140,7 +144,9 @@ public class FileSystemTableSink extends AbstractFileSystemTable @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat, @Nullable FileSystemFormatFactory formatFactory, @Nullable EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat, - @Nullable EncodingFormat<SerializationSchema<RowData>> serializationFormat) { + @Nullable EncodingFormat<SerializationSchema<RowData>> serializationFormat, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { super(context); this.bulkReaderFormat = bulkReaderFormat; this.deserializationFormat = deserializationFormat; @@ -159,6 +165,8 @@ public class FileSystemTableSink extends AbstractFileSystemTable this.configuredParallelism = tableOptions.get(FileSystemOptions.SINK_PARALLELISM); this.inlongMetric = tableOptions.get(INLONG_METRIC); this.inlongAudit = tableOptions.get(INLONG_AUDIT); + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; } @Override @@ -283,11 +291,14 @@ public class FileSystemTableSink extends AbstractFileSystemTable compactionSize, parallelism, inlongMetric, - inlongAudit); + inlongAudit, + dirtyOptions, + dirtySink); } else { writerStream = StreamingSink.writer( - dataStream, bucketCheckInterval, bucketsBuilder, parallelism, inlongMetric, inlongAudit); + dataStream, bucketCheckInterval, bucketsBuilder, parallelism, + inlongMetric, inlongAudit, dirtyOptions, dirtySink); } return StreamingSink.sink( @@ -543,7 +554,9 @@ public class FileSystemTableSink extends AbstractFileSystemTable deserializationFormat, formatFactory, bulkWriterFormat, - serializationFormat); + serializationFormat, + dirtyOptions, + dirtySink); sink.overwrite = overwrite; sink.dynamicGrouping = dynamicGrouping; sink.staticPartitions = staticPartitions; diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java index e5e74e62a..cc85113b8 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -36,12 +37,20 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.inlong.sort.base.dirty.DirtyData; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; import org.apache.inlong.sort.base.util.MetricStateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import java.io.IOException; import java.nio.charset.StandardCharsets; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; @@ -61,6 +70,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamingWriter.class); + // ------------------------ configuration fields -------------------------- private final long bucketCheckInterval; @@ -69,6 +80,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe private final String inlongMetric; private final String inlongAudit; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink<Object> dirtySink; private transient ListState<MetricState> metricStateListState; private transient MetricState metricState; @@ -88,11 +101,15 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe public AbstractStreamingWriter( long bucketCheckInterval, StreamingFileSink.BucketsBuilder<IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder, - String inlongMetric, String inlongAudit) { + String inlongMetric, String inlongAudit, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { this.bucketCheckInterval = bucketCheckInterval; this.bucketsBuilder = bucketsBuilder; this.inlongMetric = inlongMetric; this.inlongAudit = inlongAudit; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -130,6 +147,9 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe if (metricOption != null) { sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } + if (dirtySink != null) { + dirtySink.open(new Configuration()); + } } /** @@ -141,14 +161,11 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe if (sinkMetricData != null) { sinkMetricData.invoke(rowSize, dataSize); } - } catch (Exception e) { - if (sinkMetricData != null) { - sinkMetricData.invokeDirty(rowSize, dataSize); - } - LOG.error("fileSystem sink commitUpToCheckpoint.", e); - } finally { rowSize = 0L; dataSize = 0L; + } catch (Exception e) { + LOG.error("fileSystem sink commitUpToCheckpoint.", e); + throw e; } } @@ -221,11 +238,34 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe currentWatermark); rowSize = rowSize + 1; dataSize = dataSize + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length; + } catch (IOException e) { + throw new RuntimeException(e); } catch (Exception e) { + LOG.error("StreamingWriter write failed", e); + if (!dirtyOptions.ignoreDirty()) { + throw new RuntimeException(e); + } if (sinkMetricData != null) { - sinkMetricData.invokeDirty(1L, element.getValue().toString().getBytes(StandardCharsets.UTF_8).length); + sinkMetricData.invokeDirty(1L, + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length); + } + if (dirtySink != null) { + DirtyData.Builder<Object> builder = DirtyData.builder(); + try { + builder.setData(element.getValue()) + .setDirtyType(DirtyType.UNDEFINED) + .setLabels(dirtyOptions.getLabels()) + .setLogTag(dirtyOptions.getLogTag()) + .setDirtyMessage(e.getMessage()) + .setIdentifier(dirtyOptions.getIdentifier()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOGGER.warn("Dirty sink failed", ex); + } } - LOG.error("fileSystem sink processElement.", e); } } diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java index 1fb2e3742..97168a43c 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java @@ -24,7 +24,10 @@ import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.filesystem.stream.PartitionCommitInfo; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.HashSet; import java.util.NavigableMap; @@ -45,8 +48,10 @@ public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN, Partiti public StreamingFileWriter( long bucketCheckInterval, StreamingFileSink.BucketsBuilder<IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder, - String inlongMetric, String inlongAudit) { - super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit); + String inlongMetric, String inlongAudit, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { + super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit, dirtyOptions, dirtySink); } @Override diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java index 5408e34be..2a26ed4cd 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java @@ -42,8 +42,11 @@ import org.apache.flink.table.filesystem.stream.compact.CompactOperator; import org.apache.flink.table.filesystem.stream.compact.CompactReader; import org.apache.flink.table.filesystem.stream.compact.CompactWriter; import org.apache.flink.util.function.SupplierWithException; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.filesystem.stream.compact.CompactFileWriter; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.util.List; @@ -66,9 +69,12 @@ public class StreamingSink { DataStream<T> inputStream, long bucketCheckInterval, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, - int parallelism, String inlongMetric, String inlongAudit) { + int parallelism, String inlongMetric, String inlongAudit, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { StreamingFileWriter<T> fileWriter = - new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit); + new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, + inlongMetric, inlongAudit, dirtyOptions, dirtySink); return inputStream .transform( StreamingFileWriter.class.getSimpleName(), @@ -89,9 +95,11 @@ public class StreamingSink { Path path, CompactReader.Factory<T> readFactory, long targetFileSize, - int parallelism, String inlongMetric, String inlongAudit) { + int parallelism, String inlongMetric, String inlongAudit, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { CompactFileWriter<T> writer = new CompactFileWriter<>(bucketCheckInterval, bucketsBuilder, inlongMetric, - inlongAudit); + inlongAudit, dirtyOptions, dirtySink); SupplierWithException<FileSystem, IOException> fsSupplier = (SupplierWithException<FileSystem, IOException> & Serializable) () -> fsFactory.create(path.toUri()); @@ -118,7 +126,6 @@ public class StreamingSink { CompactOperator<T> compacter = new CompactOperator<>(fsSupplier, readFactory, writerFactory); - return coordinatorOp .broadcast() .transform( diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java index 731aca18f..385894bc5 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java @@ -24,8 +24,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput; import org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint; import org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.filesystem.stream.AbstractStreamingWriter; +import javax.annotation.Nullable; + /** * Writer for emitting {@link InputFile} and {@link EndCheckpoint} to downstream. */ @@ -38,8 +42,10 @@ public class CompactFileWriter<T> public CompactFileWriter( long bucketCheckInterval, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, - String inlongMetric, String inlongAudit) { - super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit); + String inlongMetric, String inlongAudit, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { + super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit, dirtyOptions, dirtySink); } @Override diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java index c4c71791b..f1010dfd5 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java @@ -162,9 +162,16 @@ public class FlinkSqlParserTest extends AbstractTestBase { new FieldInfo("age", new IntFormatInfo())), new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()), new FieldInfo("ts", new TimestampFormatInfo()))); + Map<String, String> properties = new LinkedHashMap<>(); + properties.put("dirty.side-output.connector", "log"); + properties.put("dirty.ignore", "true"); + properties.put("dirty.side-output.enable", "true"); + properties.put("dirty.side-output.format", "csv"); + properties.put("dirty.side-output.labels", + "SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student"); return new FileSystemLoadNode(id, "hdfs_output", fields, relations, - null, "hdfs://localhost:9000/file", "json", - 1, null, null, null); + null, "hdfs://localhost:9000/inlong/student", "json", + 1, properties, null, null); } /**