This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new af3437e3d [INLONG-7006][Sort] Optimize reading metric at the table
level, and the metric label is passed in by a specific connector. (#7007)
af3437e3d is described below
commit af3437e3dfdb4ff81fb6807ec2e7b99e66915fcb
Author: chestnufang <[email protected]>
AuthorDate: Wed Dec 21 15:50:43 2022 +0800
[INLONG-7006][Sort] Optimize reading metric at the table level, and the
metric label is passed in by a specific connector. (#7007)
Co-authored-by: chestnufang <[email protected]>
---
.../base/metric/sub/SourceTableMetricData.java | 89 +++++++++-------------
.../sort/cdc/debezium/DebeziumSourceFunction.java | 5 +-
.../source/metrics/MySqlSourceReaderMetrics.java | 5 +-
.../oracle/debezium/DebeziumSourceFunction.java | 5 +-
.../sort/cdc/postgres/DebeziumSourceFunction.java | 5 +-
5 files changed, 51 insertions(+), 58 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
index 3ca762bea..eac1224f6 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
@@ -24,6 +24,7 @@ import static
org.apache.inlong.sort.base.Constants.READ_PHASE;
import com.google.common.collect.Maps;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
@@ -55,9 +56,15 @@ public class SourceTableMetricData extends SourceMetricData
implements SourceSub
* The sub source metric data container of source metric data
*/
private final Map<String, SourceMetricData> subSourceMetricMap =
Maps.newHashMap();
+ /**
+ * The sub source metric label of source sub metric group and this must be
consistent with the schema information
+ * recorded by the specific connector.
+ */
+ private final List<String> tableMetricLabelList;
- public SourceTableMetricData(MetricOption option, MetricGroup metricGroup)
{
+ public SourceTableMetricData(MetricOption option, MetricGroup metricGroup,
List<String> tableMetricLabelList) {
super(option, metricGroup);
+ this.tableMetricLabelList = tableMetricLabelList;
}
/**
@@ -83,7 +90,7 @@ public class SourceTableMetricData extends SourceMetricData
implements SourceSub
}
Map<String, MetricState> subMetricStateMap =
metricState.getSubMetricStateMap();
for (Entry<String, MetricState> subMetricStateEntry :
subMetricStateMap.entrySet()) {
- String[] schemaInfoArray =
parseSchemaIdentify(subMetricStateEntry.getKey());
+ String[] schemaInfoArray =
subMetricStateEntry.getKey().split(Constants.SPILT_SEMICOLON);
final MetricState subMetricState = subMetricStateEntry.getValue();
SourceMetricData subSourceMetricData =
buildSubSourceMetricData(schemaInfoArray,
subMetricState, this);
@@ -121,13 +128,8 @@ public class SourceTableMetricData extends
SourceMetricData implements SourceSub
String metricGroupLabels = labels.entrySet().stream().map(entry ->
entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(DELIMITER));
StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
- if (schemaInfoArray.length == 2) {
-
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
-
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
- } else if (schemaInfoArray.length == 3) {
-
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
-
.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1])
-
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]);
+ for (int i = 0; i < tableMetricLabelList.size(); i++) {
+
labelBuilder.append(DELIMITER).append(tableMetricLabelList.get(i)).append("=").append(schemaInfoArray[i]);
}
MetricOption metricOption = MetricOption.builder()
.withInitRecords(subMetricState != null ?
subMetricState.getMetricValue(NUM_RECORDS_IN) : 0L)
@@ -135,32 +137,7 @@ public class SourceTableMetricData extends
SourceMetricData implements SourceSub
.withInlongLabels(labelBuilder.toString())
.withRegisterMetric(RegisteredMetric.ALL)
.build();
- return new SourceTableMetricData(metricOption,
sourceMetricData.getMetricGroup());
- }
-
- /**
- * build record schema identify,in the form of database.schema.table or
database.table
- *
- * @param database the database name of record
- * @param schema the schema name of record
- * @param table the table name of record
- * @return the record schema identify
- */
- public String buildSchemaIdentify(String database, String schema, String
table) {
- if (schema == null) {
- return database + Constants.SEMICOLON + table;
- }
- return database + Constants.SEMICOLON + schema + Constants.SEMICOLON +
table;
- }
-
- /**
- * parse record schema identify
- *
- * @param schemaIdentify the schema identify of record
- * @return the record schema identify array,String[]{database,table}
- */
- public String[] parseSchemaIdentify(String schemaIdentify) {
- return schemaIdentify.split(Constants.SPILT_SEMICOLON);
+ return new SourceTableMetricData(metricOption,
sourceMetricData.getMetricGroup(), tableMetricLabelList);
}
/**
@@ -176,22 +153,8 @@ public class SourceTableMetricData extends
SourceMetricData implements SourceSub
outputMetricsWithEstimate(data);
return;
}
- String identify = buildSchemaIdentify(database, null, table);
- SourceMetricData subSourceMetricData;
- if (subSourceMetricMap.containsKey(identify)) {
- subSourceMetricData = subSourceMetricMap.get(identify);
- } else {
- subSourceMetricData = buildSubSourceMetricData(new
String[]{database, table}, this);
- subSourceMetricMap.put(identify, subSourceMetricData);
- }
- // source metric and sub source metric output metrics
- long rowCountSize = 1L;
- long rowDataSize =
data.toString().getBytes(StandardCharsets.UTF_8).length;
- this.outputMetrics(rowCountSize, rowDataSize);
- subSourceMetricData.outputMetrics(rowCountSize, rowDataSize);
-
- // output read phase metric
- outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE :
ReadPhase.INCREASE_PHASE);
+ // output sub source metric
+ outputMetricsWithEstimate(new String[]{database, table},
isSnapshotRecord, data);
}
/**
@@ -209,12 +172,28 @@ public class SourceTableMetricData extends
SourceMetricData implements SourceSub
outputMetricsWithEstimate(data);
return;
}
- String identify = buildSchemaIdentify(database, schema, table);
+ // output sub source metric
+ outputMetricsWithEstimate(new String[]{database, schema, table},
isSnapshotRecord, data);
+ }
+
+ /**
+ * output metrics with estimate
+ *
+ * @param recordSchemaInfoArray the schema info of record
+ * @param isSnapshotRecord is it snapshot record
+ * @param data the data of record
+ */
+ public void outputMetricsWithEstimate(String[] recordSchemaInfoArray,
boolean isSnapshotRecord, Object data) {
+ if (recordSchemaInfoArray == null) {
+ outputMetricsWithEstimate(data);
+ return;
+ }
+ String identify = String.join(Constants.SEMICOLON,
recordSchemaInfoArray);
SourceMetricData subSourceMetricData;
if (subSourceMetricMap.containsKey(identify)) {
subSourceMetricData = subSourceMetricMap.get(identify);
} else {
- subSourceMetricData = buildSubSourceMetricData(new
String[]{database, schema, table}, this);
+ subSourceMetricData =
buildSubSourceMetricData(recordSchemaInfoArray, this);
subSourceMetricMap.put(identify, subSourceMetricData);
}
// source metric and sub source metric output metrics
@@ -274,7 +253,9 @@ public class SourceTableMetricData extends SourceMetricData
implements SourceSub
@Override
public String toString() {
return "SourceTableMetricData{"
- + "readPhaseMetricDataMap=" + readPhaseMetricDataMap
+ + "numRecordsIn=" + getNumRecordsIn().getCount()
+ + ", numBytesIn=" + getNumBytesIn().getCount()
+ + ", readPhaseMetricDataMap=" + readPhaseMetricDataMap
+ ", subSourceMetricMap=" + subSourceMetricMap
+ '}';
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index a446456cf..d99da8389 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -27,6 +27,7 @@ import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.Arrays;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
@@ -50,6 +51,7 @@ import
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffset;
import
org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffsetSerializer;
@@ -454,7 +456,8 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- sourceMetricData = new SourceTableMetricData(metricOption,
metricGroup);
+ sourceMetricData = new SourceTableMetricData(metricOption,
metricGroup,
+ Arrays.asList(Constants.DATABASE_NAME,
Constants.TABLE_NAME));
if (migrateAll) {
// register sub source metric data from metric state
sourceMetricData.registerSubMetricsGroup(metricState);
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index c941eb083..2655ab7ac 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -21,10 +21,12 @@ import static
org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
@@ -65,7 +67,8 @@ public class MySqlSourceReaderMetrics {
public void registerMetrics(MetricOption metricOption) {
if (metricOption != null) {
- sourceTableMetricData = new SourceTableMetricData(metricOption,
metricGroup);
+ sourceTableMetricData = new SourceTableMetricData(metricOption,
metricGroup,
+ Arrays.asList(Constants.DATABASE_NAME,
Constants.TABLE_NAME));
}
metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>)
this::getFetchDelay);
metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>)
this::getEmitDelay);
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
index d114d5834..37b628d03 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
@@ -35,6 +35,7 @@ import
io.debezium.relational.history.TableChanges.TableChange;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.UUID;
@@ -67,6 +68,7 @@ import
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
@@ -456,7 +458,8 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- sourceMetricData = new SourceTableMetricData(metricOption,
metricGroup);
+ sourceMetricData = new SourceTableMetricData(metricOption,
metricGroup,
+ Arrays.asList(Constants.DATABASE_NAME,
Constants.SCHEMA_NAME, Constants.TABLE_NAME));
if (sourceMultipleEnable) {
// register sub source metric data from metric state
sourceMetricData.registerSubMetricsGroup(metricState);
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
index cc7f4c46b..ef3f744de 100644
---
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
@@ -44,6 +44,7 @@ import
io.debezium.relational.history.TableChanges.TableChange;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.UUID;
@@ -76,6 +77,7 @@ import
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.base.metric.MetricOption;
@@ -456,7 +458,8 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- sourceMetricData = new SourceTableMetricData(metricOption,
metricGroup);
+ sourceMetricData = new SourceTableMetricData(metricOption,
metricGroup,
+ Arrays.asList(Constants.DATABASE_NAME,
Constants.SCHEMA_NAME, Constants.TABLE_NAME));
if (migrateAll) {
// register sub source metric data from metric state
sourceMetricData.registerSubMetricsGroup(metricState);