This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6572cf9f8 [INLONG-7245][Sort] Support metric and audit in 
sort-connect-redis (#7309)
6572cf9f8 is described below

commit 6572cf9f8b0913b062a80fa14caa68d0d0716652
Author: feat <featzh...@outlook.com>
AuthorDate: Thu Feb 2 16:55:04 2023 +0800

    [INLONG-7245][Sort] Support metric and audit in sort-connect-redis (#7309)
---
 .../sort/redis/sink/AbstractRedisSinkFunction.java | 69 ++++++++++++++++++----
 .../sort/redis/sink/RedisBitmapSinkFunction.java   |  8 ++-
 .../sort/redis/sink/RedisDynamicTableSink.java     | 27 +++++++--
 .../sort/redis/sink/RedisHashSinkFunction.java     |  9 ++-
 .../sort/redis/sink/RedisPlainSinkFunction.java    |  8 ++-
 .../inlong/sort/redis/sink/RedisSinkFunction.java  |  8 ++-
 .../sort/redis/table/RedisDynamicTableFactory.java |  4 +-
 7 files changed, 106 insertions(+), 27 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
index 2829198ea..97ae9b9f2 100644
--- 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
@@ -18,17 +18,22 @@
 package org.apache.inlong.sort.redis.sink;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.commons.lang3.time.StopWatch;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -37,6 +42,10 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
 import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import 
org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer;
 import 
org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder;
 import org.apache.inlong.sort.redis.common.schema.StateEncoder;
@@ -90,8 +99,6 @@ public abstract class AbstractRedisSinkFunction<OUT>
 
     private final List<OUT> rows;
 
-    private transient ScheduledExecutorService executorService;
-
     /**
      * The container for all available Redis commands.
      */
@@ -104,6 +111,12 @@ public abstract class AbstractRedisSinkFunction<OUT>
     protected transient StopWatch stopWatch;
 
     protected StateEncoder<OUT> stateEncoder;
+    private final String auditHostAndPorts;
+
+    private final String inLongMetric;
+    private transient MetricState metricState;
+    private transient ListState<MetricState> metricStateListState;
+    private SinkMetricData sinkMetricData;
 
     public AbstractRedisSinkFunction(
             TypeInformation<OUT> outputType,
@@ -112,7 +125,9 @@ public abstract class AbstractRedisSinkFunction<OUT>
             long batchSize,
             Duration flushInterval,
             Duration configuration,
-            FlinkJedisConfigBase flinkJedisConfigBase) {
+            FlinkJedisConfigBase flinkJedisConfigBase,
+            String inLongMetric,
+            String auditHostAndPorts) {
         checkNotNull(configuration, "The configuration must not be null.");
 
         this.stateEncoder = stateEncoder;
@@ -124,6 +139,8 @@ public abstract class AbstractRedisSinkFunction<OUT>
         this.forceFlush = false;
         this.rows = new ArrayList<>();
         this.flinkJedisConfigBase = flinkJedisConfigBase;
+        this.inLongMetric = inLongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
     }
 
     @Override
@@ -152,11 +169,34 @@ public abstract class AbstractRedisSinkFunction<OUT>
             outputFlusher = Optional.of(new OutputFlusher(threadName, 
flushIntervalInMillis));
             outputFlusher.get().start();
         }
-
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inLongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? 
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+        }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        if (this.inLongMetric != null) {
+            this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint<MetricState>() {
+                            })));
+        }
+
+        if (context.isRestored()) {
+            metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+
         final ListStateDescriptor<OUT> stateDescriptor = new 
ListStateDescriptor<>(
                 "rowState", outputType);
         this.listState = 
context.getOperatorStateStore().getListState(stateDescriptor);
@@ -174,6 +214,10 @@ public abstract class AbstractRedisSinkFunction<OUT>
             listState.clear();
             listState.addAll(rows);
         }
+        if (sinkMetricData != null && metricStateListState != null) {
+            
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
sinkMetricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
         LOG.info("redis end snapshotState, id: {}", 
functionSnapshotContext.getCheckpointId());
     }
 
@@ -189,6 +233,7 @@ public abstract class AbstractRedisSinkFunction<OUT>
     public void invoke(RowData in, Context context) {
 
         List<OUT> redisOutputs = serialize(in);
+        sendMetrics(in.toString().getBytes());
         synchronized (lock) {
             rows.addAll(redisOutputs);
             if (forceFlush || rows.size() >= batchSize) {
@@ -202,14 +247,6 @@ public abstract class AbstractRedisSinkFunction<OUT>
     public void close() throws Exception {
         closeClient();
 
-        if (executorService != null) {
-            try {
-                executorService.shutdown();
-            } catch (Throwable t) {
-                LOG.warn("Could not properly shut down 
ScheduledExecutorService.", t);
-            }
-        }
-
         super.close();
 
         LOG.info("Closed redis sink.");
@@ -289,4 +326,10 @@ public abstract class AbstractRedisSinkFunction<OUT>
             }
         }
     }
+
+    protected void sendMetrics(byte[] document) {
+        if (sinkMetricData != null) {
+            sinkMetricData.invoke(1, document.length);
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java
index 4e59aa252..d68ff3d28 100644
--- 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java
@@ -45,7 +45,9 @@ public class RedisBitmapSinkFunction
             long batchSize,
             Duration flushInterval,
             Duration configuration,
-            FlinkJedisConfigBase flinkJedisConfigBase) {
+            FlinkJedisConfigBase flinkJedisConfigBase,
+            String inlongMetric,
+            String auditHostAndPorts) {
         super(TypeInformation.of(new TypeHint<Tuple4<Boolean, String, Long, 
Boolean>>() {
         }),
                 serializationSchema,
@@ -53,7 +55,9 @@ public class RedisBitmapSinkFunction
                 batchSize,
                 flushInterval,
                 configuration,
-                flinkJedisConfigBase);
+                flinkJedisConfigBase,
+                inlongMetric,
+                auditHostAndPorts);
         LOG.info("Creating RedisBitmapStaticKvPairSinkFunction ...");
     }
 
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java
index df16979f2..2e96e3c1d 100644
--- 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java
@@ -59,13 +59,18 @@ public class RedisDynamicTableSink implements 
DynamicTableSink {
     private final ReadableConfig config;
     private final Map<String, String> properties;
 
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+
     public RedisDynamicTableSink(
             EncodingFormat<SerializationSchema<RowData>> format,
             ResolvedSchema resolvedSchema,
             RedisDataType dataType,
             SchemaMappingMode schemaMappingMode,
             ReadableConfig config,
-            Map<String, String> properties) {
+            Map<String, String> properties,
+            String inlongMetric,
+            String auditHostAndPorts) {
         this.format = format;
         this.resolvedSchema = resolvedSchema;
         this.dataType = dataType;
@@ -73,6 +78,9 @@ public class RedisDynamicTableSink implements 
DynamicTableSink {
         this.config = config;
         this.properties = properties;
 
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+
         flinkJedisConfigBase = RedisHandlerServices
                 .findRedisHandler(InlongJedisConfigHandler.class, properties)
                 .createFlinkJedisConfig(config);
@@ -117,7 +125,9 @@ public class RedisDynamicTableSink implements 
DynamicTableSink {
                         batchSize,
                         flushInterval,
                         expireTime,
-                        flinkJedisConfigBase);
+                        flinkJedisConfigBase,
+                        inlongMetric,
+                        auditHostAndPorts);
                 break;
             case PLAIN:
                 redisSinkFunction = new RedisPlainSinkFunction(
@@ -126,7 +136,9 @@ public class RedisDynamicTableSink implements 
DynamicTableSink {
                         batchSize,
                         flushInterval,
                         expireTime,
-                        flinkJedisConfigBase);
+                        flinkJedisConfigBase,
+                        inlongMetric,
+                        auditHostAndPorts);
                 break;
             case BITMAP:
                 redisSinkFunction = new RedisBitmapSinkFunction(
@@ -135,7 +147,10 @@ public class RedisDynamicTableSink implements 
DynamicTableSink {
                         batchSize,
                         flushInterval,
                         expireTime,
-                        flinkJedisConfigBase);
+                        flinkJedisConfigBase,
+
+                        inlongMetric,
+                        auditHostAndPorts);
                 break;
             default:
                 throw new UnsupportedOperationException();
@@ -178,7 +193,9 @@ public class RedisDynamicTableSink implements 
DynamicTableSink {
                 dataType,
                 mappingMode,
                 config,
-                properties);
+                properties,
+                inlongMetric,
+                auditHostAndPorts);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java
index 7a2c25a9c..0874ac2f5 100644
--- 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java
@@ -45,7 +45,9 @@ public class RedisHashSinkFunction
             long batchSize,
             Duration flushInterval,
             Duration expireTime,
-            FlinkJedisConfigBase flinkJedisConfigBase) {
+            FlinkJedisConfigBase flinkJedisConfigBase,
+            String inlongMetric,
+            String auditHostAndPorts) {
         super(TypeInformation.of(new TypeHint<Tuple4<Boolean, String, String, 
String>>() {
         }),
                 serializationSchema,
@@ -53,7 +55,10 @@ public class RedisHashSinkFunction
                 batchSize,
                 flushInterval,
                 expireTime,
-                flinkJedisConfigBase);
+                flinkJedisConfigBase,
+
+                inlongMetric,
+                auditHostAndPorts);
         LOG.info("Creating RedisHashSinkFunction ...");
     }
 
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java
index e8fc817bd..6163b866f 100644
--- 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java
@@ -45,7 +45,9 @@ public class RedisPlainSinkFunction
             long batchSize,
             Duration flushInterval,
             Duration configuration,
-            FlinkJedisConfigBase flinkJedisConfigBase) {
+            FlinkJedisConfigBase flinkJedisConfigBase,
+            String inlongMetric,
+            String auditHostAndPorts) {
         super(TypeInformation.of(new TypeHint<Tuple3<Boolean, String, 
String>>() {
         }),
                 serializationSchema,
@@ -53,7 +55,9 @@ public class RedisPlainSinkFunction
                 batchSize,
                 flushInterval,
                 configuration,
-                flinkJedisConfigBase);
+                flinkJedisConfigBase,
+                inlongMetric,
+                auditHostAndPorts);
         checkNotNull(serializationSchema, "The serialization schema must not 
be null.");
         ensureSerializable(serializationSchema);
     }
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java
index 232f81486..0f5f3e489 100644
--- 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java
@@ -45,7 +45,9 @@ public class RedisSinkFunction
             long batchSize,
             Duration flushInterval,
             Duration configuration,
-            FlinkJedisConfigBase flinkJedisConfigBase) {
+            FlinkJedisConfigBase flinkJedisConfigBase,
+            String inlongMetric,
+            String auditHostAndPorts) {
         super(TypeInformation.of(new TypeHint<Tuple3<Boolean, String, 
String>>() {
         }),
                 serializationSchema,
@@ -53,7 +55,9 @@ public class RedisSinkFunction
                 batchSize,
                 flushInterval,
                 configuration,
-                flinkJedisConfigBase);
+                flinkJedisConfigBase,
+                inlongMetric,
+                auditHostAndPorts);
         checkNotNull(serializationSchema, "The serialization schema must not 
be null.");
         ensureSerializable(serializationSchema);
     }
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
index 21657c69e..8edf979e9 100644
--- 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
@@ -128,7 +128,9 @@ public class RedisDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 dataType,
                 schemaMappingMode,
                 config,
-                properties);
+                properties,
+                inlongMetric,
+                auditHostAndPorts);
     }
 
     private RedisLookupOptions getJdbcLookupOptions(ReadableConfig 
readableConfig) {

Reply via email to