This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b7c6f49d7 [INLONG-6575][Sort] Add dirty data metric for Filesystem (#6726) b7c6f49d7 is described below commit b7c6f49d7fd1cd6c86419f991bb74426ea04252f Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Tue Dec 6 11:51:31 2022 +0800 [INLONG-6575][Sort] Add dirty data metric for Filesystem (#6726) --- .../filesystem/stream/AbstractStreamingWriter.java | 46 +++++++++++++++++----- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java index 568b639f1..e5e74e62a 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java @@ -42,6 +42,10 @@ import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; import org.apache.inlong.sort.base.util.MetricStateUtils; +import java.nio.charset.StandardCharsets; + +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; @@ -78,7 +82,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe private transient long currentWatermark; - private SinkMetricData metricData; + private Long dataSize = 0L; + private Long rowSize = 0L; public AbstractStreamingWriter( long bucketCheckInterval, @@ -118,6 +123,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe .withInlongAudit(inlongAudit) .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) + .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -129,7 +136,20 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe * Commit up to this checkpoint id. */ protected void commitUpToCheckpoint(long checkpointId) throws Exception { - helper.commitUpToCheckpoint(checkpointId); + try { + helper.commitUpToCheckpoint(checkpointId); + if (sinkMetricData != null) { + sinkMetricData.invoke(rowSize, dataSize); + } + } catch (Exception e) { + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(rowSize, dataSize); + } + LOG.error("fileSystem sink commitUpToCheckpoint.", e); + } finally { + rowSize = 0L; + dataSize = 0L; + } } @Override @@ -192,14 +212,20 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe } @Override - public void processElement(StreamRecord<IN> element) throws Exception { - helper.onElement( - element.getValue(), - getProcessingTimeService().getCurrentProcessingTime(), - element.hasTimestamp() ? element.getTimestamp() : null, - currentWatermark); - if (metricData != null) { - metricData.invokeWithEstimate(element.getValue()); + public void processElement(StreamRecord<IN> element) { + try { + helper.onElement( + element.getValue(), + getProcessingTimeService().getCurrentProcessingTime(), + element.hasTimestamp() ? element.getTimestamp() : null, + currentWatermark); + rowSize = rowSize + 1; + dataSize = dataSize + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length; + } catch (Exception e) { + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1L, element.getValue().toString().getBytes(StandardCharsets.UTF_8).length); + } + LOG.error("fileSystem sink processElement.", e); } }