This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 179143610 [INLONG-6886][Sort] Add dirty message for doris sink (#6887)
179143610 is described below

commit 179143610e6fd49f3813151d680b9c513e9e82ae
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Thu Dec 15 15:49:46 2022 +0800

    [INLONG-6886][Sort] Add dirty message for doris sink (#6887)
---
 .../sort/doris/table/DorisDynamicSchemaOutputFormat.java      | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index fd8d8eacd..e5aeec53a 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -268,6 +268,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         if (metricOption != null) {
             metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
         }
+        if (dirtySink != null) {
+            try {
+                dirtySink.open(new Configuration());
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
         if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
             this.scheduler = new ScheduledThreadPoolExecutor(1,
                     new 
ExecutorThreadFactory("doris-streamload-output-format"));
@@ -358,12 +365,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         }
         if (row instanceof RowData) {
             RowData rowData = (RowData) row;
-            JsonNode rootNode = null;
+            JsonNode rootNode;
             try {
                 rootNode = 
jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
             } catch (Exception e) {
                 LOG.error(String.format("deserialize error, raw data: %s", new 
String(rowData.getBinary(0))), e);
                 handleDirtyData(new String(rowData.getBinary(0)), 
DirtyType.DESERIALIZE_ERROR, e);
+                return;
             }
             boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
             if (isDDL) {
@@ -478,6 +486,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                         .setDirtyType(dirtyType)
                         .setLabels(dirtyOptions.getLabels())
                         .setLogTag(dirtyOptions.getLogTag())
+                        .setDirtyMessage(e.getMessage())
                         .setIdentifier(dirtyOptions.getIdentifier());
                 dirtySink.invoke(builder.build());
             } catch (Exception ex) {

Reply via email to