This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 659385a0bdad7b1eaf264273efe24c9731f33a5e Author: Yizhou Yang <32808678+yizhou-y...@users.noreply.github.com> AuthorDate: Tue Jan 3 14:06:22 2023 +0800 [INLONG-7061][Sort] Support table level metrics for Apache Doris connector and add dirty metrics (#7062) --- .../sort/base/dirty/sink/s3/S3DirtySink.java | 3 +- .../inlong/sort/base/dirty/sink/s3/S3Helper.java | 2 +- .../sort/base/metric/sub/SinkTableMetricData.java | 28 +++++++++++++ .../table/DorisDynamicSchemaOutputFormat.java | 48 +++++++++++++++++++++- 4 files changed, 78 insertions(+), 3 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java index ab8fc9464..b8f1f5f10 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java @@ -22,6 +22,7 @@ import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter; @@ -244,7 +245,7 @@ public class S3DirtySink<T> implements DirtySink<T> { } String content = null; try { - content = StringUtils.join(values, s3Options.getLineDelimiter()); + content = StringUtils.join(values, StringEscapeUtils.unescapeJava(s3Options.getLineDelimiter())); s3Helper.upload(identifier, content); LOGGER.info("Write {} records to s3 of identifier: {}", values.size(), identifier); writeOutNum.addAndGet(values.size()); diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java index d79b8aecd..f925d76e8 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java @@ -39,7 +39,7 @@ public class S3Helper implements Serializable { private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); private static final int SEQUENCE_LENGTH = 4; - private static final String ESCAPE_PATTERN = "[\\pP\\p{Punct}\\s]"; + private static final String ESCAPE_PATTERN = "[,,+=: ;()()。/.;]"; private static final String FILE_NAME_SUFFIX = ".txt"; private final Random r = new Random(); private final AmazonS3 s3Client; diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java index cf5285b13..a5690a5b5 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java @@ -196,6 +196,34 @@ public class SinkTableMetricData extends SinkMetricData implements SinkSubMetric subSinkMetricData.invoke(rowCount, rowSize); } + /** + * output dirty metrics with estimate + * + * @param database the database name of record + * @param schema the schema name of record + * @param table the table name of record + * @param rowCount the row count of records + * @param rowSize the row size of records + */ + public void outputDirtyMetricsWithEstimate(String database, String table, long rowCount, + long rowSize) { + if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) { + invokeDirty(rowCount, rowSize); + return; + } + String identify = buildSchemaIdentify(database, null, table); + SinkMetricData subSinkMetricData; + if (subSinkMetricMap.containsKey(identify)) { + subSinkMetricData = subSinkMetricMap.get(identify); + } else { + subSinkMetricData = buildSubSinkMetricData(new String[]{database, table}, this); + subSinkMetricMap.put(identify, subSinkMetricData); + } + // sink metric and sub sink metric output metrics + this.invokeDirty(rowCount, rowSize); + subSinkMetricData.invokeDirty(rowCount, rowSize); + } + public void outputMetricsWithEstimate(Object data) { long size = data.toString().getBytes(StandardCharsets.UTF_8).length; invoke(1, size); diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java index 1b9bc4c48..efa04a630 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java @@ -76,6 +76,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; @@ -141,7 +143,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { private transient MetricState metricState; private final String[] fieldNames; private volatile boolean jsonFormat; - private String keysType; private volatile RowData.FieldGetter[] fieldGetters; private String fieldDelimiter; private String lineDelimiter; @@ -267,6 +268,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { .withInlongAudit(auditHostAndPorts) .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) + .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) .withRegisterMetric(MetricOption.RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -486,6 +489,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { } throw ex; } + + if (multipleSink) { + handleMultipleDirtyData(dirtyData, dirtyType, e); + return; + } + if (dirtySink != null) { DirtyData.Builder<Object> builder = DirtyData.builder(); try { @@ -503,6 +512,43 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { LOG.warn("Dirty sink failed", ex); } } + metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length); + } + + private void handleMultipleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) { + JsonNode rootNode; + try { + rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0)); + } catch (Exception ex) { + handleDirtyData(dirtyData, DirtyType.DESERIALIZE_ERROR, e); + return; + } + + if (dirtySink != null) { + DirtyData.Builder<Object> builder = DirtyData.builder(); + try { + builder.setData(dirtyData) + .setDirtyType(dirtyType) + .setLabels(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels())) + .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag())) + .setDirtyMessage(e.getMessage()) + .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier())); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOG.warn("Dirty sink failed", ex); + } + } + try { + metricData.outputDirtyMetricsWithEstimate( + jsonDynamicSchemaFormat.parse(rootNode, databasePattern), + jsonDynamicSchemaFormat.parse(rootNode, tablePattern), 1, + ((RowData) dirtyData).getBinary(0).length); + } catch (Exception ex) { + metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length); + } } private void handleColumnsChange(String tableIdentifier, JsonNode rootNode, JsonNode physicalData) {