Yizhou-Yang commented on code in PR #7148:
URL: https://github.com/apache/inlong/pull/7148#discussion_r1092681823


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java:
##########
@@ -110,22 +114,59 @@ public void invokeMultiple(T dirtyData, DirtyType 
dirtyType, Throwable e,
             }
             throw ex;
         }
+
+        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+                (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        final String SEPARATOR = "%#%#%#";
+        JsonNode rootNode = null;
+        List<String> actualIdentifier = new ArrayList<>();
+
+        try {
+            // for rowdata where identifier is not the first element, append 
identifier and SEPARATOR before it.
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) 
dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String rawIdentifier = ((String) 
dirtyData).split(SEPARATOR)[0];

Review Comment:
   for rowdata in multiple sink, the dirty sink needs to know the identifier. 
So I passed in "identifier + DIRTY_SEPARATOR + rowData" to be parsed for those 
that misses the identifier. The multiple sink scenario all lacks the 
identifier, so this identifier is added for every single call of string to this 
method. This one only has db/table, so it is easier to process



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to