This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 179143610 [INLONG-6886][Sort] Add dirty message for doris sink (#6887) 179143610 is described below commit 179143610e6fd49f3813151d680b9c513e9e82ae Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Thu Dec 15 15:49:46 2022 +0800 [INLONG-6886][Sort] Add dirty message for doris sink (#6887) --- .../sort/doris/table/DorisDynamicSchemaOutputFormat.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java index fd8d8eacd..e5aeec53a 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java @@ -268,6 +268,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { if (metricOption != null) { metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } + if (dirtySink != null) { + try { + dirtySink.open(new Configuration()); + } catch (Exception e) { + throw new IOException(e); + } + } if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("doris-streamload-output-format")); @@ -358,12 +365,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { } if (row instanceof RowData) { RowData rowData = (RowData) row; - JsonNode rootNode = null; + JsonNode rootNode; try { rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0)); } catch (Exception e) { LOG.error(String.format("deserialize error, raw data: %s", new String(rowData.getBinary(0))), e); handleDirtyData(new String(rowData.getBinary(0)), DirtyType.DESERIALIZE_ERROR, e); + return; } boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode); if (isDDL) { @@ -478,6 +486,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { .setDirtyType(dirtyType) .setLabels(dirtyOptions.getLabels()) .setLogTag(dirtyOptions.getLogTag()) + .setDirtyMessage(e.getMessage()) .setIdentifier(dirtyOptions.getIdentifier()); dirtySink.invoke(builder.build()); } catch (Exception ex) {