This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c40021560 [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127) c40021560 is described below commit c400215603434e4fd4b6dff6104fda7b86c83a3b Author: Yizhou Yang <32808678+yizhou-y...@users.noreply.github.com> AuthorDate: Wed Jan 4 10:16:19 2023 +0800 [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127) --- .../inlong/sort/base/dirty/DirtySinkHelper.java | 43 ++++++++++++++++++++++ .../internal/JdbcMultiBatchingOutputFormat.java | 3 +- .../starrocks/manager/StarRocksSinkManager.java | 16 ++++---- .../table/sink/StarRocksDynamicSinkFunction.java | 2 +- 4 files changed, 55 insertions(+), 9 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java index a962b974e..e815db856 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java @@ -18,8 +18,12 @@ package org.apache.inlong.sort.base.dirty; import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; +import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +97,45 @@ public class DirtySinkHelper<T> implements Serializable { } } + public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e, + String sinkMultipleFormat) { + JsonDynamicSchemaFormat jsonDynamicSchemaFormat = + (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); + if (!dirtyOptions.ignoreDirty()) { + RuntimeException ex; + if (e instanceof RuntimeException) { + ex = (RuntimeException) e; + } else { + ex = new RuntimeException(e); + } + throw ex; + } + if (dirtySink != null) { + JsonNode rootNode; + DirtyData.Builder<T> builder = DirtyData.builder(); + try { + rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0)); + } catch (Exception ex) { + invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e); + return; + } + try { + builder.setData(dirtyData) + .setDirtyType(dirtyType) + .setLabels(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels())) + .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag())) + .setDirtyMessage(e.getMessage()) + .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier())); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOGGER.warn("Dirty sink failed", ex); + } + } + } + public void setDirtyOptions(DirtyOptions dirtyOptions) { this.dirtyOptions = dirtyOptions; } diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java index dd4be4d5c..67e9adac0 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java @@ -566,7 +566,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()), 1L, true); if (!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)) { - dirtySinkHelper.invoke(record, DirtyType.RETRY_LOAD_ERROR, tableException); + dirtySinkHelper.invokeMultiple(record, DirtyType.RETRY_LOAD_ERROR, tableException, + sinkMultipleFormat); } tableExceptionMap.put(tableIdentifier, tableException); if (stopWritingWhenTableException) { diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java index d6a0f0f4e..19c762192 100644 --- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java +++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java @@ -122,7 +122,8 @@ public class StarRocksSinkManager implements Serializable { private final SchemaUpdateExceptionPolicy schemaUpdatePolicy; private transient SinkTableMetricData metricData; - private final DirtySinkHelper<Object> dirtySinkHelper;; + private final DirtySinkHelper<Object> dirtySinkHelper; + private String sinkMultipleFormat; /** * If a table writing throws exception, ignore it when receiving data later again @@ -149,7 +150,6 @@ public class StarRocksSinkManager implements Serializable { this.schemaUpdatePolicy = schemaUpdatePolicy; this.dirtySinkHelper = dirtySinkHelper; - init(flinkSchema); } @@ -159,7 +159,8 @@ public class StarRocksSinkManager implements Serializable { StarRocksQueryVisitor starrocksQueryVisitor, boolean multipleSink, SchemaUpdateExceptionPolicy schemaUpdatePolicy, - DirtySinkHelper<Object> dirtySinkHelper) { + DirtySinkHelper<Object> dirtySinkHelper, + String multipleformat) { this.sinkOptions = sinkOptions; this.jdbcConnProvider = jdbcConnProvider; this.starrocksQueryVisitor = starrocksQueryVisitor; @@ -168,7 +169,7 @@ public class StarRocksSinkManager implements Serializable { this.schemaUpdatePolicy = schemaUpdatePolicy; this.dirtySinkHelper = dirtySinkHelper; - + this.sinkMultipleFormat = multipleformat; init(flinkSchema); } @@ -450,12 +451,13 @@ public class StarRocksSinkManager implements Serializable { // archive dirty data if (StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat())) { for (byte[] row : flushData.getBuffer()) { - dirtySinkHelper.invoke(new String(row, StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e); + dirtySinkHelper.invokeMultiple(new String(row, StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e, + sinkMultipleFormat); } } else if (StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat())) { for (byte[] row : flushData.getBuffer()) { - dirtySinkHelper.invoke(OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)), - DirtyType.BATCH_LOAD_ERROR, e); + dirtySinkHelper.invokeMultiple(OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)), + DirtyType.BATCH_LOAD_ERROR, e, sinkMultipleFormat); } } diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java index 16b985595..089c86e84 100644 --- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java +++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java @@ -124,7 +124,7 @@ public class StarRocksDynamicSinkFunction<T> extends RichSinkFunction<T> impleme StarRocksQueryVisitor starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider, sinkOptions.getDatabaseName(), sinkOptions.getTableName()); this.sinkManager = new StarRocksSinkManager(sinkOptions, schema, jdbcConnProvider, starrocksQueryVisitor, - multipleSink, schemaUpdatePolicy, dirtySinkHelper); + multipleSink, schemaUpdatePolicy, dirtySinkHelper, sinkMultipleFormat); rowTransformer.setStarRocksColumns(starrocksQueryVisitor.getFieldMapping()); rowTransformer.setTableSchema(schema);