This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e305ae869 [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118) e305ae869 is described below commit e305ae8692832f592fde722c9143f0a13580ac8c Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Jan 4 17:01:31 2023 +0800 [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118) --- .../sort/base/metric/sub/SinkTableMetricData.java | 13 ++++ .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 2 +- .../sink/multiple/DynamicSchemaHandleOperator.java | 84 ++++++++++++++++++++-- .../sink/multiple/IcebergMultipleStreamWriter.java | 60 +++------------- .../sink/multiple/IcebergSingleStreamWriter.java | 10 ++- 5 files changed, 110 insertions(+), 59 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java index a5690a5b5..842584ba7 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java @@ -256,6 +256,19 @@ public class SinkTableMetricData extends SinkMetricData implements SinkSubMetric subSinkMetricData.invokeDirty(rowCount, rowSize); } + /** + * output dirty metrics with estimate + * + * @param database the database name of record + * @param schema the schema name of record + * @param table the table name of record + * @param data the dirty data + */ + public void outputDirtyMetricsWithEstimate(String database, String schema, String table, Object data) { + long size = data == null ? 0L : data.toString().getBytes(StandardCharsets.UTF_8).length; + outputDirtyMetrics(database, schema, table, 1, size); + } + public void outputDirtyMetricsWithEstimate(Object data) { long size = data.toString().getBytes(StandardCharsets.UTF_8).length; invokeDirty(1, size); diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java index a5f070522..b0ed02abe 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -548,7 +548,7 @@ public class FlinkSink { int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; DynamicSchemaHandleOperator routeOperator = new DynamicSchemaHandleOperator( - catalogLoader, multipleSinkOption, dirtyOptions, dirtySink); + catalogLoader, multipleSinkOption, dirtyOptions, dirtySink, inlongMetric, auditHostAndPorts); SingleOutputStreamOperator<RecordWithSchema> routeStream = input .transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME), TypeInformation.of(RecordWithSchema.class), diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index 37bb6f944..53fef8dd1 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -17,6 +17,12 @@ package org.apache.inlong.sort.iceberg.sink.multiple; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -43,9 +49,14 @@ import org.apache.inlong.sort.base.dirty.DirtyType; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat; import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData; import org.apache.inlong.sort.base.sink.MultipleSinkOption; import org.apache.inlong.sort.base.sink.TableChange; import org.apache.inlong.sort.base.sink.TableChange.AddColumn; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +72,12 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWithSchema> implements OneInputStreamOperator<RowData, RecordWithSchema>, @@ -90,13 +107,22 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi private final DirtyOptions dirtyOptions; private @Nullable final DirtySink<Object> dirtySink; + // metric + private final String inlongMetric; + private final String auditHostAndPorts; + private @Nullable transient SinkTableMetricData metricData; + private transient ListState<MetricState> metricStateListState; + private transient MetricState metricState; + public DynamicSchemaHandleOperator(CatalogLoader catalogLoader, MultipleSinkOption multipleSinkOption, DirtyOptions dirtyOptions, - @Nullable DirtySink<Object> dirtySink) { + @Nullable DirtySink<Object> dirtySink, String inlongMetric, String auditHostAndPorts) { this.catalogLoader = catalogLoader; this.multipleSinkOption = multipleSinkOption; this.dirtyOptions = dirtyOptions; this.dirtySink = dirtySink; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; } @SuppressWarnings("unchecked") @@ -117,6 +143,20 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi this.recordQueues = new HashMap<>(); this.schemaCache = new HashMap<>(); this.blacklist = new HashSet<>(); + + // Initialize metric + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) + .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + metricData = new SinkTableMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } } @Override @@ -136,14 +176,15 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi LOGGER.error(String.format("Deserialize error, raw data: %s", new String(element.getValue().getBinary(0))), e); handleDirtyData(new String(element.getValue().getBinary(0)), - null, DirtyType.DESERIALIZE_ERROR, e); + null, DirtyType.DESERIALIZE_ERROR, e, TableIdentifier.of("unknow", "unknow")); } TableIdentifier tableId = null; try { tableId = parseId(jsonNode); } catch (Exception e) { LOGGER.error(String.format("Table identifier parse error, raw data: %s", jsonNode), e); - handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, e); + handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, + e, TableIdentifier.of("unknow", "unknow")); } if (blacklist.contains(tableId)) { return; @@ -156,7 +197,11 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi } } - private void handleDirtyData(Object dirtyData, JsonNode rootNode, DirtyType dirtyType, Exception e) { + private void handleDirtyData(Object dirtyData, + JsonNode rootNode, + DirtyType dirtyType, + Exception e, + TableIdentifier tableId) { if (!dirtyOptions.ignoreDirty()) { RuntimeException ex; if (e instanceof RuntimeException) { @@ -182,6 +227,10 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi .setIdentifier(dirtyOptions.getIdentifier()); } dirtySink.invoke(builder.build()); + if (metricData != null) { + metricData.outputDirtyMetricsWithEstimate( + tableId.namespace().toString(), null, tableId.name(), dirtyData); + } } catch (Exception ex) { if (!dirtyOptions.ignoreSideOutputErrors()) { throw new RuntimeException(ex); @@ -198,6 +247,29 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this); } + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, + getRuntimeContext().getIndexOfThisSubtask()); + } + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + // init metric state + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + String.format(INLONG_METRIC_STATE_NAME), TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + } + private void execDDL(JsonNode jsonNode, TableIdentifier tableId) { // todo:parse ddl sql } @@ -242,7 +314,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi LOG.warn("Ignore table {} schema change, old: {} new: {}.", tableId, dataSchema, latestSchema, e); blacklist.add(tableId); - handleDirtyData(jsonNode, jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e); + handleDirtyData(jsonNode, jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId); } return Collections.emptyList(); }); @@ -329,7 +401,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi tableId, pkListStr); } catch (Exception e) { - handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e); + handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e, tableId); } return null; } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java index 1abd5b97a..3022ec6c0 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java @@ -17,10 +17,6 @@ package org.apache.inlong.sort.iceberg.sink.multiple; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -38,13 +34,9 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.PropertyUtil; +import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.dirty.DirtyOptions; import org.apache.inlong.sort.base.dirty.sink.DirtySink; -import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; -import org.apache.inlong.sort.base.metric.MetricState; -import org.apache.inlong.sort.base.metric.SinkMetricData; -import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.base.sink.MultipleSinkOption; import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory; import org.slf4j.Logger; @@ -65,9 +57,7 @@ import static org.apache.iceberg.TableProperties.UPSERT_ENABLED; import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; -import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; -import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; -import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; +import static org.apache.inlong.sort.base.Constants.DELIMITER; /** * Iceberg writer that can distinguish different sink tables and route and distribute data into different @@ -93,10 +83,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi // metric private final String inlongMetric; private final String auditHostAndPorts; - @Nullable - private transient SinkMetricData metricData; - private transient ListState<MetricState> metricStateListState; - private transient MetricState metricState; private final DirtyOptions dirtyOptions; private @Nullable final DirtySink<Object> dirtySink; @@ -123,18 +109,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi this.multipleWriters = new HashMap<>(); this.multipleTables = new HashMap<>(); this.multipleSchemas = new HashMap<>(); - - // Initialize metric - MetricOption metricOption = MetricOption.builder() - .withInlongLabels(inlongMetric) - .withInlongAudit(auditHostAndPorts) - .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) - .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) - .withRegisterMetric(RegisteredMetric.ALL) - .build(); - if (metricOption != null) { - metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); - } } @Override @@ -202,9 +176,14 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi appendMode); if (multipleWriters.get(tableId) == null) { + StringBuilder subWriterInlongMetric = new StringBuilder(inlongMetric); + subWriterInlongMetric.append(DELIMITER) + .append(Constants.DATABASE_NAME).append("=").append(tableId.namespace().toString()) + .append(DELIMITER) + .append(Constants.TABLE_NAME).append("=").append(tableId.name()); IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>( - tableId.toString(), taskWriterFactory, null, - null, flinkRowType, dirtyOptions, dirtySink); + tableId.toString(), taskWriterFactory, subWriterInlongMetric.toString(), + auditHostAndPorts, flinkRowType, dirtyOptions, dirtySink); writer.setup(getRuntimeContext(), new CallbackCollector<>( writeResult -> collector.collect(new MultipleWriteResult(tableId, writeResult))), @@ -223,9 +202,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi if (multipleWriters.get(tableId) != null) { for (RowData data : recordWithSchema.getData()) { multipleWriters.get(tableId).processElement(data); - if (metricData != null) { - metricData.invokeWithEstimate(data); - } } } else { LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId()); @@ -244,29 +220,11 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry : multipleWriters.entrySet()) { entry.getValue().snapshotState(context); } - - // metric - if (metricData != null && metricStateListState != null) { - MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, - getRuntimeContext().getIndexOfThisSubtask()); - } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { this.functionInitializationContext = context; - - // init metric state - if (this.inlongMetric != null) { - this.metricStateListState = context.getOperatorStateStore().getUnionListState( - new ListStateDescriptor<>( - INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { - }))); - } - if (context.isRestored()) { - metricState = MetricStateUtils.restoreMetricState(metricStateListState, - getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); - } } private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) { diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java index c8bfbeb08..dc30c5b21 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java @@ -44,6 +44,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; @@ -105,6 +107,8 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ .withInlongAudit(auditHostAndPorts) .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) + .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -138,6 +142,9 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ .setRowType(flinkRowType) .setDirtyMessage(e.getMessage()); dirtySink.invoke(builder.build()); + if (metricData != null) { + metricData.invokeDirtyWithEstimate(value); + } } catch (Exception ex) { if (!dirtyOptions.ignoreSideOutputErrors()) { throw new RuntimeException(ex); @@ -157,7 +164,8 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ if (this.inlongMetric != null) { this.metricStateListState = context.getOperatorStateStore().getUnionListState( new ListStateDescriptor<>( - INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + String.format("Iceberg(%s)-" + INLONG_METRIC_STATE_NAME, fullTableName), + TypeInformation.of(new TypeHint<MetricState>() { }))); } if (context.isRestored()) {