yunqingmoswu commented on code in PR #7148:
URL: https://github.com/apache/inlong/pull/7148#discussion_r1091499008


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java:
##########
@@ -110,22 +114,59 @@ public void invokeMultiple(T dirtyData, DirtyType 
dirtyType, Throwable e,
             }
             throw ex;
         }
+
+        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+                (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        final String SEPARATOR = "%#%#%#";
+        JsonNode rootNode = null;
+        List<String> actualIdentifier = new ArrayList<>();
+
+        try {
+            // for rowdata where identifier is not the first element, append 
identifier and SEPARATOR before it.
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) 
dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String rawIdentifier = ((String) 
dirtyData).split(SEPARATOR)[0];

Review Comment:
   why do it like this?



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -130,8 +131,16 @@ public synchronized void invoke(DirtyData<T> dirtyData) 
throws Exception {
     }
 
     private boolean valid() {
-        return (s3Options.getBatchSize() > 0 && size >= 
s3Options.getBatchSize())
-                || batchBytes >= s3Options.getMaxBatchBytes();
+        // stash dirty data for at least a minute to avoid flushing too fast

Review Comment:
   In the current archiving logic, archiving can be done according to time 
interval or throughput, but the processing here is redundant and inelegant



##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -512,27 +524,58 @@ private void handleDirtyData(Object dirtyData, DirtyType 
dirtyType, Exception e)
                 LOG.warn("Dirty sink failed", ex);
             }
         }
+
         metricData.invokeDirty(1, 
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
     }
 
     private void handleMultipleDirtyData(Object dirtyData, DirtyType 
dirtyType, Exception e) {
-        JsonNode rootNode;
+        JsonNode rootNode = null;
+        String database = null;
+        String table = null;
         try {
-            rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) 
dirtyData).getBinary(0));
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) 
dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String[] arr = ((String) dirtyData).split("\\.");

Review Comment:
   why do. it like this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to