Yizhou-Yang commented on code in PR #7148: URL: https://github.com/apache/inlong/pull/7148#discussion_r1092681823
########## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java: ########## @@ -110,22 +114,59 @@ public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e, } throw ex; } + + JsonDynamicSchemaFormat jsonDynamicSchemaFormat = + (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); + final String SEPARATOR = "%#%#%#"; + JsonNode rootNode = null; + List<String> actualIdentifier = new ArrayList<>(); + + try { + // for rowdata where identifier is not the first element, append identifier and SEPARATOR before it. + if (dirtyData instanceof RowData) { + rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0)); + } else if (dirtyData instanceof JsonNode) { + rootNode = (JsonNode) dirtyData; + } else if (dirtyData instanceof String) { + // parse and remove the added identifier for string cases + String rawIdentifier = ((String) dirtyData).split(SEPARATOR)[0]; Review Comment: for rowdata in multiple sink, the dirty sink needs to know the identifier. So I passed in "identifier + DIRTY_SEPARATOR + rowData" to be parsed for those that misses the identifier. The multiple sink scenario all lacks the identifier, so this identifier is added for every single call of string to this method. This one only has db/table, so it is easier to process -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org