This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f8d3989ab [INLONG-7075][Sort] Add table level metric and dirty data backup for PostgreSQL (#7088) f8d3989ab is described below commit f8d3989ab29256dc34f5b5bca92c2a2979443c96 Author: kuansix <490305...@qq.com> AuthorDate: Fri Dec 30 19:23:25 2022 +0800 [INLONG-7075][Sort] Add table level metric and dirty data backup for PostgreSQL (#7088) --- .../apache/inlong/sort/base/dirty/DirtyType.java | 4 + .../internal/JdbcMultiBatchingOutputFormat.java | 98 ++++++++++++++-------- .../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 5 +- 3 files changed, 69 insertions(+), 38 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java index 89789872a..d6bf10e61 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java @@ -68,6 +68,10 @@ public enum DirtyType { * Batch load error */ BATCH_LOAD_ERROR("BatchLoadError"), + /** + * Retry load error + */ + RETRY_LOAD_ERROR("RetryLoadError"), /** * Unsupported data type */ diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java index 1176ca6f4..dd4be4d5c 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java @@ -42,11 +42,13 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; +import org.apache.inlong.sort.base.dirty.DirtyType; import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricState; -import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData; import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect; @@ -82,6 +84,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; @@ -113,7 +117,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc private transient Map<String, List<String>> pkNameMap = new HashMap<>(); private transient Map<String, List<GenericRowData>> recordsMap = new HashMap<>(); private transient Map<String, Exception> tableExceptionMap = new HashMap<>(); - private transient Boolean isIgnoreTableException; + private transient Boolean stopWritingWhenTableException; private transient ListState<MetricState> metricStateListState; private final String sinkMultipleFormat; @@ -121,10 +125,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc private final String tablePattern; private final String schemaPattern; private transient MetricState metricState; - private SinkMetricData sinkMetricData; - private Long dataSize = 0L; - private Long rowSize = 0L; + private SinkTableMetricData sinkMetricData; private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy; + private final DirtySinkHelper<Object> dirtySinkHelper; private static final DateTimeFormatter SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; private static final DateTimeFormatter SQL_TIME_FORMAT; @@ -149,7 +152,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc String schemaPattern, String inlongMetric, String auditHostAndPorts, - SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) { + SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy, + DirtySinkHelper<Object> dirtySinkHelper) { super(connectionProvider); this.executionOptions = checkNotNull(executionOptions); this.dmlOptions = dmlOptions; @@ -162,6 +166,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy; + this.dirtySinkHelper = dirtySinkHelper; } /** @@ -177,10 +182,13 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc .withInlongAudit(auditHostAndPorts) .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) + .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) .withRegisterMetric(MetricOption.RegisteredMetric.ALL) .build(); if (metricOption != null) { - sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup()); + sinkMetricData = new SinkTableMetricData(metricOption, runtimeContext.getMetricGroup()); + sinkMetricData.registerSubMetricsGroup(metricState); } jdbcExecMap = new HashMap<>(); connectionExecProviderMap = new HashMap<>(); @@ -188,8 +196,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc rowTypeMap = new HashMap<>(); recordsMap = new HashMap<>(); tableExceptionMap = new HashMap<>(); - isIgnoreTableException = schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE) - || schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.STOP_PARTIAL); + stopWritingWhenTableException = + schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE) + || schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.STOP_PARTIAL); if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { this.scheduler = Executors.newScheduledThreadPool( @@ -201,15 +210,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc if (!closed) { try { flush(); - if (sinkMetricData != null) { - sinkMetricData.invoke(rowSize, dataSize); - } - resetStateAfterFlush(); } catch (Exception e) { - if (sinkMetricData != null) { - sinkMetricData.invokeDirty(rowSize, dataSize); - } - resetStateAfterFlush(); + LOG.info("Synchronized flush get Exception:", e); } } } @@ -346,8 +348,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc LOG.info("Cal tableIdentifier get Exception:", e); return; } - rowSize++; - dataSize = dataSize + rootNode.toString().getBytes(StandardCharsets.UTF_8).length; GenericRowData record = null; try { @@ -380,16 +380,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc if (executionOptions.getBatchSize() > 0 && batchCount >= executionOptions.getBatchSize()) { flush(); - if (sinkMetricData != null) { - sinkMetricData.invoke(rowSize, dataSize); - } - resetStateAfterFlush(); } } catch (Exception e) { - if (sinkMetricData != null) { - sinkMetricData.invokeDirty(rowSize, dataSize); - } - resetStateAfterFlush(); throw new IOException("Writing records to JDBC failed.", e); } } @@ -465,11 +457,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc return record; } - private void resetStateAfterFlush() { - dataSize = 0L; - rowSize = 0L; - } - @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { if (sinkMetricData != null && metricStateListState != null) { @@ -509,9 +496,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc protected void attemptFlush() throws IOException { for (Map.Entry<String, List<GenericRowData>> entry : recordsMap.entrySet()) { String tableIdentifier = entry.getKey(); - boolean isIgnoreTableIdentifierException = isIgnoreTableException + boolean stopTableIdentifierWhenException = stopWritingWhenTableException && (null != tableExceptionMap.get(tableIdentifier)); - if (isIgnoreTableIdentifierException) { + if (stopTableIdentifierWhenException) { continue; } List<GenericRowData> tableIdRecordList = entry.getValue(); @@ -523,11 +510,15 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc Exception tableException = null; try { jdbcStatementExecutor = getOrCreateStatementExecutor(tableIdentifier); + Long totalDataSize = 0L; for (GenericRowData record : tableIdRecordList) { + totalDataSize = totalDataSize + record.toString().getBytes(StandardCharsets.UTF_8).length; jdbcStatementExecutor.addToBatch((JdbcIn) record); } jdbcStatementExecutor.executeBatch(); flushFlag = true; + outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()), + totalDataSize, false); } catch (Exception e) { tableException = e; LOG.warn("Flush all data for tableIdentifier:{} get err:", tableIdentifier, e); @@ -549,10 +540,13 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc jdbcStatementExecutor = getOrCreateStatementExecutor(tableIdentifier); jdbcStatementExecutor.addToBatch((JdbcIn) record); jdbcStatementExecutor.executeBatch(); + Long totalDataSize = + Long.valueOf(record.toString().getBytes(StandardCharsets.UTF_8).length); + outputMetrics(tableIdentifier, 1L, totalDataSize, false); flushFlag = true; break; } catch (Exception e) { - LOG.error("Flush one record tableIdentifier:{} ,retryTimes:{} get err:", + LOG.warn("Flush one record tableIdentifier:{} ,retryTimes:{} get err:", tableIdentifier, retryTimes, e); getAndSetPkFromErrMsg(e.getMessage(), tableIdentifier); tableException = e; @@ -569,11 +563,16 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc if (!flushFlag && null != tableException) { LOG.info("Put tableIdentifier:{} exception:{}", tableIdentifier, tableException.getMessage()); + outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()), + 1L, true); + if (!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)) { + dirtySinkHelper.invoke(record, DirtyType.RETRY_LOAD_ERROR, tableException); + } tableExceptionMap.put(tableIdentifier, tableException); - if (isIgnoreTableException) { + if (stopWritingWhenTableException) { LOG.info("Stop write table:{} because occur exception", tableIdentifier); - continue; + break; } } } @@ -582,6 +581,31 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc } } + /** + * Output metrics with estimate for pg or other type jdbc connectors. + * tableIdentifier maybe: ${dbName}.${tbName} or ${dbName}.${schemaName}.${tbName} + */ + private void outputMetrics(String tableIdentifier, Long rowSize, Long dataSize, boolean dirtyFlag) { + String[] fieldArray = tableIdentifier.split("\\."); + if (fieldArray.length == 3) { + if (dirtyFlag) { + sinkMetricData.outputDirtyMetrics(fieldArray[0], fieldArray[1], fieldArray[2], + rowSize, dataSize); + } else { + sinkMetricData.outputMetrics(fieldArray[0], fieldArray[1], fieldArray[2], + rowSize, dataSize); + } + } else if (fieldArray.length == 2) { + if (dirtyFlag) { + sinkMetricData.outputDirtyMetrics(fieldArray[0], null, fieldArray[1], + rowSize, dataSize); + } else { + sinkMetricData.outputMetrics(fieldArray[0], null, fieldArray[1], + rowSize, dataSize); + } + } + } + /** * Executes prepared statement and closes all resources of this instance. */ diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java index 531864cd4..ab519a03c 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java @@ -37,6 +37,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; import org.apache.inlong.sort.base.dirty.DirtyOptions; import org.apache.inlong.sort.base.dirty.sink.DirtySink; @@ -336,6 +337,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { checkNotNull(jdbcOptions, "jdbc options can not be null"); checkNotNull(dmlOptions, "jdbc dml options can not be null"); checkNotNull(executionOptions, "jdbc execution options can not be null"); + final DirtySinkHelper<Object> dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink); return new JdbcMultiBatchingOutputFormat<>( new SimpleJdbcConnectionProvider(jdbcOptions), executionOptions, @@ -348,6 +350,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { schemaPattern, inlongMetric, auditHostAndPorts, - schemaUpdateExceptionPolicy); + schemaUpdateExceptionPolicy, + dirtySinkHelper); } } \ No newline at end of file