This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b7c6f49d7 [INLONG-6575][Sort] Add dirty data metric for Filesystem 
(#6726)
b7c6f49d7 is described below

commit b7c6f49d7fd1cd6c86419f991bb74426ea04252f
Author: Xin Gong <genzhedangd...@gmail.com>
AuthorDate: Tue Dec 6 11:51:31 2022 +0800

    [INLONG-6575][Sort] Add dirty data metric for Filesystem (#6726)
---
 .../filesystem/stream/AbstractStreamingWriter.java | 46 +++++++++++++++++-----
 1 file changed, 36 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 568b639f1..e5e74e62a 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -42,6 +42,10 @@ import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -78,7 +82,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
 
     private transient long currentWatermark;
 
-    private SinkMetricData metricData;
+    private Long dataSize = 0L;
+    private Long rowSize = 0L;
 
     public AbstractStreamingWriter(
             long bucketCheckInterval,
@@ -118,6 +123,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
                 .withInlongAudit(inlongAudit)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? 
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
@@ -129,7 +136,20 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
      * Commit up to this checkpoint id.
      */
     protected void commitUpToCheckpoint(long checkpointId) throws Exception {
-        helper.commitUpToCheckpoint(checkpointId);
+        try {
+            helper.commitUpToCheckpoint(checkpointId);
+            if (sinkMetricData != null) {
+                sinkMetricData.invoke(rowSize, dataSize);
+            }
+        } catch (Exception e) {
+            if (sinkMetricData != null) {
+                sinkMetricData.invokeDirty(rowSize, dataSize);
+            }
+            LOG.error("fileSystem sink commitUpToCheckpoint.", e);
+        } finally {
+            rowSize = 0L;
+            dataSize = 0L;
+        }
     }
 
     @Override
@@ -192,14 +212,20 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
     }
 
     @Override
-    public void processElement(StreamRecord<IN> element) throws Exception {
-        helper.onElement(
-                element.getValue(),
-                getProcessingTimeService().getCurrentProcessingTime(),
-                element.hasTimestamp() ? element.getTimestamp() : null,
-                currentWatermark);
-        if (metricData != null) {
-            metricData.invokeWithEstimate(element.getValue());
+    public void processElement(StreamRecord<IN> element) {
+        try {
+            helper.onElement(
+                    element.getValue(),
+                    getProcessingTimeService().getCurrentProcessingTime(),
+                    element.hasTimestamp() ? element.getTimestamp() : null,
+                    currentWatermark);
+            rowSize = rowSize + 1;
+            dataSize = dataSize + 
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length;
+        } catch (Exception e) {
+            if (sinkMetricData != null) {
+                sinkMetricData.invokeDirty(1L, 
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length);
+            }
+            LOG.error("fileSystem sink processElement.", e);
         }
     }
 

Reply via email to