This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f8d3989ab [INLONG-7075][Sort] Add table level metric and dirty data 
backup for PostgreSQL (#7088)
f8d3989ab is described below

commit f8d3989ab29256dc34f5b5bca92c2a2979443c96
Author: kuansix <490305...@qq.com>
AuthorDate: Fri Dec 30 19:23:25 2022 +0800

    [INLONG-7075][Sort] Add table level metric and dirty data backup for 
PostgreSQL (#7088)
---
 .../apache/inlong/sort/base/dirty/DirtyType.java   |  4 +
 .../internal/JdbcMultiBatchingOutputFormat.java    | 98 ++++++++++++++--------
 .../jdbc/table/JdbcDynamicOutputFormatBuilder.java |  5 +-
 3 files changed, 69 insertions(+), 38 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
index 89789872a..d6bf10e61 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
@@ -68,6 +68,10 @@ public enum DirtyType {
      * Batch load error
      */
     BATCH_LOAD_ERROR("BatchLoadError"),
+    /**
+     * Retry load error
+     */
+    RETRY_LOAD_ERROR("RetryLoadError"),
     /**
      * Unsupported data type
      */
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index 1176ca6f4..dd4be4d5c 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -42,11 +42,13 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
@@ -82,6 +84,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
@@ -113,7 +117,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
     private transient Map<String, List<String>> pkNameMap = new HashMap<>();
     private transient Map<String, List<GenericRowData>> recordsMap = new 
HashMap<>();
     private transient Map<String, Exception> tableExceptionMap = new 
HashMap<>();
-    private transient Boolean isIgnoreTableException;
+    private transient Boolean stopWritingWhenTableException;
 
     private transient ListState<MetricState> metricStateListState;
     private final String sinkMultipleFormat;
@@ -121,10 +125,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
     private final String tablePattern;
     private final String schemaPattern;
     private transient MetricState metricState;
-    private SinkMetricData sinkMetricData;
-    private Long dataSize = 0L;
-    private Long rowSize = 0L;
+    private SinkTableMetricData sinkMetricData;
     private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
 
     private static final DateTimeFormatter 
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
     private static final DateTimeFormatter SQL_TIME_FORMAT;
@@ -149,7 +152,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
             String schemaPattern,
             String inlongMetric,
             String auditHostAndPorts,
-            SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) {
+            SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy,
+            DirtySinkHelper<Object> dirtySinkHelper) {
         super(connectionProvider);
         this.executionOptions = checkNotNull(executionOptions);
         this.dmlOptions = dmlOptions;
@@ -162,6 +166,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
         this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy;
+        this.dirtySinkHelper = dirtySinkHelper;
     }
 
     /**
@@ -177,10 +182,13 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                 .withInlongAudit(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? 
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
                 .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            sinkMetricData = new SinkMetricData(metricOption, 
runtimeContext.getMetricGroup());
+            sinkMetricData = new SinkTableMetricData(metricOption, 
runtimeContext.getMetricGroup());
+            sinkMetricData.registerSubMetricsGroup(metricState);
         }
         jdbcExecMap = new HashMap<>();
         connectionExecProviderMap = new HashMap<>();
@@ -188,8 +196,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
         rowTypeMap = new HashMap<>();
         recordsMap = new HashMap<>();
         tableExceptionMap = new HashMap<>();
-        isIgnoreTableException = 
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE)
-                || 
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.STOP_PARTIAL);
+        stopWritingWhenTableException =
+                
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE)
+                        || 
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.STOP_PARTIAL);
         if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
             this.scheduler =
                     Executors.newScheduledThreadPool(
@@ -201,15 +210,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                                     if (!closed) {
                                         try {
                                             flush();
-                                            if (sinkMetricData != null) {
-                                                sinkMetricData.invoke(rowSize, 
dataSize);
-                                            }
-                                            resetStateAfterFlush();
                                         } catch (Exception e) {
-                                            if (sinkMetricData != null) {
-                                                
sinkMetricData.invokeDirty(rowSize, dataSize);
-                                            }
-                                            resetStateAfterFlush();
+                                            LOG.info("Synchronized flush get 
Exception:", e);
                                         }
                                     }
                                 }
@@ -346,8 +348,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                 LOG.info("Cal tableIdentifier get Exception:", e);
                 return;
             }
-            rowSize++;
-            dataSize = dataSize + 
rootNode.toString().getBytes(StandardCharsets.UTF_8).length;
 
             GenericRowData record = null;
             try {
@@ -380,16 +380,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                 if (executionOptions.getBatchSize() > 0
                         && batchCount >= executionOptions.getBatchSize()) {
                     flush();
-                    if (sinkMetricData != null) {
-                        sinkMetricData.invoke(rowSize, dataSize);
-                    }
-                    resetStateAfterFlush();
                 }
             } catch (Exception e) {
-                if (sinkMetricData != null) {
-                    sinkMetricData.invokeDirty(rowSize, dataSize);
-                }
-                resetStateAfterFlush();
                 throw new IOException("Writing records to JDBC failed.", e);
             }
         }
@@ -465,11 +457,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
         return record;
     }
 
-    private void resetStateAfterFlush() {
-        dataSize = 0L;
-        rowSize = 0L;
-    }
-
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
         if (sinkMetricData != null && metricStateListState != null) {
@@ -509,9 +496,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
     protected void attemptFlush() throws IOException {
         for (Map.Entry<String, List<GenericRowData>> entry : 
recordsMap.entrySet()) {
             String tableIdentifier = entry.getKey();
-            boolean isIgnoreTableIdentifierException = isIgnoreTableException
+            boolean stopTableIdentifierWhenException = 
stopWritingWhenTableException
                     && (null != tableExceptionMap.get(tableIdentifier));
-            if (isIgnoreTableIdentifierException) {
+            if (stopTableIdentifierWhenException) {
                 continue;
             }
             List<GenericRowData> tableIdRecordList = entry.getValue();
@@ -523,11 +510,15 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
             Exception tableException = null;
             try {
                 jdbcStatementExecutor = 
getOrCreateStatementExecutor(tableIdentifier);
+                Long totalDataSize = 0L;
                 for (GenericRowData record : tableIdRecordList) {
+                    totalDataSize = totalDataSize + 
record.toString().getBytes(StandardCharsets.UTF_8).length;
                     jdbcStatementExecutor.addToBatch((JdbcIn) record);
                 }
                 jdbcStatementExecutor.executeBatch();
                 flushFlag = true;
+                outputMetrics(tableIdentifier, 
Long.valueOf(tableIdRecordList.size()),
+                        totalDataSize, false);
             } catch (Exception e) {
                 tableException = e;
                 LOG.warn("Flush all data for tableIdentifier:{} get err:", 
tableIdentifier, e);
@@ -549,10 +540,13 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                             jdbcStatementExecutor = 
getOrCreateStatementExecutor(tableIdentifier);
                             jdbcStatementExecutor.addToBatch((JdbcIn) record);
                             jdbcStatementExecutor.executeBatch();
+                            Long totalDataSize =
+                                    
Long.valueOf(record.toString().getBytes(StandardCharsets.UTF_8).length);
+                            outputMetrics(tableIdentifier, 1L, totalDataSize, 
false);
                             flushFlag = true;
                             break;
                         } catch (Exception e) {
-                            LOG.error("Flush one record tableIdentifier:{} 
,retryTimes:{} get err:",
+                            LOG.warn("Flush one record tableIdentifier:{} 
,retryTimes:{} get err:",
                                     tableIdentifier, retryTimes, e);
                             getAndSetPkFromErrMsg(e.getMessage(), 
tableIdentifier);
                             tableException = e;
@@ -569,11 +563,16 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                     if (!flushFlag && null != tableException) {
                         LOG.info("Put tableIdentifier:{} exception:{}",
                                 tableIdentifier, tableException.getMessage());
+                        outputMetrics(tableIdentifier, 
Long.valueOf(tableIdRecordList.size()),
+                                1L, true);
+                        if 
(!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP))
 {
+                            dirtySinkHelper.invoke(record, 
DirtyType.RETRY_LOAD_ERROR, tableException);
+                        }
                         tableExceptionMap.put(tableIdentifier, tableException);
-                        if (isIgnoreTableException) {
+                        if (stopWritingWhenTableException) {
                             LOG.info("Stop write table:{} because occur 
exception",
                                     tableIdentifier);
-                            continue;
+                            break;
                         }
                     }
                 }
@@ -582,6 +581,31 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
         }
     }
 
+    /**
+     * Output metrics with estimate for pg or other type jdbc connectors.
+     * tableIdentifier maybe: ${dbName}.${tbName} or 
${dbName}.${schemaName}.${tbName}
+     */
+    private void outputMetrics(String tableIdentifier, Long rowSize, Long 
dataSize, boolean dirtyFlag) {
+        String[] fieldArray = tableIdentifier.split("\\.");
+        if (fieldArray.length == 3) {
+            if (dirtyFlag) {
+                sinkMetricData.outputDirtyMetrics(fieldArray[0], 
fieldArray[1], fieldArray[2],
+                        rowSize, dataSize);
+            } else {
+                sinkMetricData.outputMetrics(fieldArray[0], fieldArray[1], 
fieldArray[2],
+                        rowSize, dataSize);
+            }
+        } else if (fieldArray.length == 2) {
+            if (dirtyFlag) {
+                sinkMetricData.outputDirtyMetrics(fieldArray[0], null, 
fieldArray[1],
+                        rowSize, dataSize);
+            } else {
+                sinkMetricData.outputMetrics(fieldArray[0], null, 
fieldArray[1],
+                        rowSize, dataSize);
+            }
+        }
+    }
+
     /**
      * Executes prepared statement and closes all resources of this instance.
      */
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index 531864cd4..ab519a03c 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
@@ -336,6 +337,7 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
         checkNotNull(jdbcOptions, "jdbc options can not be null");
         checkNotNull(dmlOptions, "jdbc dml options can not be null");
         checkNotNull(executionOptions, "jdbc execution options can not be 
null");
+        final DirtySinkHelper<Object> dirtySinkHelper = new 
DirtySinkHelper<>(dirtyOptions, dirtySink);
         return new JdbcMultiBatchingOutputFormat<>(
                 new SimpleJdbcConnectionProvider(jdbcOptions),
                 executionOptions,
@@ -348,6 +350,7 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
                 schemaPattern,
                 inlongMetric,
                 auditHostAndPorts,
-                schemaUpdateExceptionPolicy);
+                schemaUpdateExceptionPolicy,
+                dirtySinkHelper);
     }
 }
\ No newline at end of file

Reply via email to