This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new fd09a6d16 [INLONG-5992][Sort] Fix runtimeContext not initialized in JDBC and ES when reload metric status (#5994) fd09a6d16 is described below commit fd09a6d16eec068cf0b1d4b3b1708459badd2d43 Author: Oneal65 <liush...@foxmail.com> AuthorDate: Fri Sep 23 11:52:20 2022 +0800 [INLONG-5992][Sort] Fix runtimeContext not initialized in JDBC and ES when reload metric status (#5994) --- .../inlong/sort/base/metric/SinkMetricData.java | 54 ++++++++++++++++------ .../sort/elasticsearch/ElasticsearchSinkBase.java | 1 + .../elasticsearch/ElasticsearchSinkFunction.java | 4 ++ .../table/RowElasticsearchSinkFunction.java | 5 ++ .../jdbc/internal/GenericJdbcSinkFunction.java | 3 +- 5 files changed, 52 insertions(+), 15 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 34f759a83..45e33c3c2 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -24,6 +24,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.inlong.audit.AuditImp; import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -53,14 +54,16 @@ public class SinkMetricData implements MetricData { private Counter dirtyBytes; private Meter numRecordsOutPerSecond; private Meter numBytesOutPerSecond; + private RegisteredMetric registeredMetric; public SinkMetricData(MetricOption option, MetricGroup metricGroup) { this.metricGroup = metricGroup; this.labels = option.getLabels(); + this.registeredMetric = option.getRegisteredMetric(); ThreadSafeCounter recordsOutCounter = new ThreadSafeCounter(); ThreadSafeCounter bytesOutCounter = new ThreadSafeCounter(); - switch (option.getRegisteredMetric()) { + switch (registeredMetric) { case DIRTY: registerMetricsForDirtyBytes(new ThreadSafeCounter()); registerMetricsForDirtyRecords(new ThreadSafeCounter()); @@ -291,18 +294,41 @@ public class SinkMetricData implements MetricData { @Override public String toString() { - return "SinkMetricData{" - + "metricGroup=" + metricGroup - + ", labels=" + labels - + ", auditImp=" + auditImp - + ", numRecordsOut=" + numRecordsOut.getCount() - + ", numBytesOut=" + numBytesOut.getCount() - + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount() - + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount() - + ", dirtyRecords=" + dirtyRecords.getCount() - + ", dirtyBytes=" + dirtyBytes.getCount() - + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate() - + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate() - + '}'; + switch (registeredMetric) { + case DIRTY: + return "SinkMetricData{" + + "metricGroup=" + metricGroup + + ", labels=" + labels + + ", auditImp=" + auditImp + + ", dirtyRecords=" + dirtyRecords.getCount() + + ", dirtyBytes=" + dirtyBytes.getCount() + + '}'; + case NORMAL: + return "SinkMetricData{" + + "metricGroup=" + metricGroup + + ", labels=" + labels + + ", auditImp=" + auditImp + + ", numRecordsOut=" + numRecordsOut.getCount() + + ", numBytesOut=" + numBytesOut.getCount() + + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount() + + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount() + + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate() + + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate() + + '}'; + default: + return "SinkMetricData{" + + "metricGroup=" + metricGroup + + ", labels=" + labels + + ", auditImp=" + auditImp + + ", numRecordsOut=" + numRecordsOut.getCount() + + ", numBytesOut=" + numBytesOut.getCount() + + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount() + + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount() + + ", dirtyRecords=" + dirtyRecords.getCount() + + ", dirtyBytes=" + dirtyBytes.getCount() + + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate() + + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate() + + '}'; + } } } diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java index dff8b9335..7761e9ebd 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java @@ -293,6 +293,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends @Override public void initializeState(FunctionInitializationContext context) throws Exception { // no initialization needed + elasticsearchSinkFunction.setRuntimeContext(getRuntimeContext()); elasticsearchSinkFunction.initializeState(context); } diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java index e1c019918..d68c0d6e9 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java @@ -69,4 +69,8 @@ public interface ElasticsearchSinkFunction<T> extends Serializable, Function { default void snapshotState(FunctionSnapshotContext context) throws Exception { } + + default void setRuntimeContext(RuntimeContext ctx) { + + } } diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java index a5abc4690..3e68f3e43 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -117,6 +117,11 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R } } + @Override + public void setRuntimeContext(RuntimeContext ctx) { + this.runtimeContext = ctx; + } + @Override public void initializeState(FunctionInitializationContext context) throws Exception { if (this.inlongMetric != null) { diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java index 0afd5fb24..744fb8dc9 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java @@ -60,7 +60,8 @@ public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T> @Override public void initializeState(FunctionInitializationContext context) throws Exception { - outputFormat.initializeState(context); + outputFormat.setRuntimeContext(getRuntimeContext()); + outputFormat.initializeState(context); } @Override