This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd422ffe9 [INLONG-10144][Sort] Redis connectors support audit ID 
(#10146)
9bd422ffe9 is described below

commit 9bd422ffe96e17e69f3ddc03fe3709754638febe
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Wed May 8 18:36:58 2024 +0800

    [INLONG-10144][Sort] Redis connectors support audit ID (#10146)
    
    Co-authored-by: vinnerzhang <vinnerzh...@tencent.com>
---
 .../sort/redis/source/RedisDynamicTableSource.java | 22 ++++++++++++++++++----
 .../redis/source/RedisRowDataLookupFunction.java   | 18 +++++++++++++++++-
 .../sort/redis/table/RedisDynamicTableFactory.java |  7 ++++++-
 3 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
index 7f28fccdb5..277045eac2 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
@@ -37,7 +37,11 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
 
-import static org.apache.flink.table.types.logical.LogicalTypeRoot.*;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+
+//import static org.apache.flink.table.types.logical.LogicalTypeRoot.*;
 
 /**
  * Redis dynamic table source
@@ -52,8 +56,13 @@ public class RedisDynamicTableSource implements 
LookupTableSource {
     private final RedisLookupOptions redisLookupOptions;
     private final Map<String, String> properties;
 
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    private final String auditKeys;
+
     public RedisDynamicTableSource(Map<String, String> properties, 
ResolvedSchema tableSchema,
-            ReadableConfig config, RedisLookupOptions redisLookupOptions) {
+            ReadableConfig config, RedisLookupOptions redisLookupOptions, 
String inlongMetric, String auditHostAndPorts,
+            String auditKeys) {
         this.properties = properties;
         Preconditions.checkNotNull(properties, "properties should not be 
null");
         this.tableSchema = tableSchema;
@@ -73,11 +82,15 @@ public class RedisDynamicTableSource implements 
LookupTableSource {
         flinkJedisConfigBase = RedisHandlerServices
                 .findRedisHandler(InlongJedisConfigHandler.class, 
properties).createFlinkJedisConfig(config);
         this.redisLookupOptions = redisLookupOptions;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.auditKeys = auditKeys;
     }
 
     @Override
     public DynamicTableSource copy() {
-        return new RedisDynamicTableSource(properties, tableSchema, config, 
redisLookupOptions);
+        return new RedisDynamicTableSource(properties, tableSchema, config, 
redisLookupOptions, inlongMetric,
+                auditHostAndPorts, auditKeys);
     }
 
     @Override
@@ -88,6 +101,7 @@ public class RedisDynamicTableSource implements 
LookupTableSource {
     @Override
     public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
         return TableFunctionProvider.of(new RedisRowDataLookupFunction(
-                redisMapper.getCommandDescription(), flinkJedisConfigBase, 
this.redisLookupOptions));
+                redisMapper.getCommandDescription(), flinkJedisConfigBase, 
this.redisLookupOptions, inlongMetric,
+                auditHostAndPorts, auditKeys));
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
index 95c9acd41f..7186d5a603 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.redis.source;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.redis.common.config.RedisLookupOptions;
 import 
org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer;
 import 
org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder;
@@ -55,14 +57,25 @@ public class RedisRowDataLookupFunction extends 
TableFunction<RowData> {
     private transient Cache<RowData, RowData> cache;
     private InlongRedisCommandsContainer redisCommandsContainer;
 
+    private SourceMetricData sourceMetricData;
+
     RedisRowDataLookupFunction(RedisCommandDescription redisCommandDescription,
-            FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions 
redisLookupOptions) {
+            FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions 
redisLookupOptions, String inlongMetric,
+            String auditHostAndPorts, String auditKeys) {
         this.flinkJedisConfigBase = flinkJedisConfigBase;
         this.redisCommand = redisCommandDescription.getCommand();
         this.additionalKey = redisCommandDescription.getAdditionalKey();
         this.cacheMaxSize = redisLookupOptions.getCacheMaxSize();
         this.cacheExpireMs = redisLookupOptions.getCacheExpireMs();
         this.maxRetryTimes = redisLookupOptions.getMaxRetryTimes();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption);
+        }
     }
 
     /**
@@ -107,6 +120,9 @@ public class RedisRowDataLookupFunction extends 
TableFunction<RowData> {
                         throw new UnsupportedOperationException(
                                 String.format("Unsupported for redisCommand: 
%s", redisCommand));
                 }
+                if (sourceMetricData != null) {
+                    sourceMetricData.outputMetricsWithEstimate(rowData);
+                }
                 if (cache == null) {
                     collect(rowData);
                 } else {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
index ef6c778a98..54f907ece9 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
@@ -95,10 +95,15 @@ public class RedisDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
     public DynamicTableSource createDynamicTableSource(Context context) {
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         ReadableConfig config = helper.getOptions();
+
         helper.validate();
         validateConfigOptions(config, SUPPORT_SOURCE_COMMANDS);
+        String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = config.get(INLONG_AUDIT);
+        String auditKeys = config.get(AUDIT_KEYS);
         return new 
RedisDynamicTableSource(context.getCatalogTable().getOptions(),
-                context.getCatalogTable().getResolvedSchema(), config, 
getJdbcLookupOptions(config));
+                context.getCatalogTable().getResolvedSchema(), config, 
getJdbcLookupOptions(config), inlongMetric,
+                auditHostAndPorts, auditKeys);
     }
 
     @Override

Reply via email to