This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 9bd422ffe9 [INLONG-10144][Sort] Redis connectors support audit ID (#10146) 9bd422ffe9 is described below commit 9bd422ffe96e17e69f3ddc03fe3709754638febe Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Wed May 8 18:36:58 2024 +0800 [INLONG-10144][Sort] Redis connectors support audit ID (#10146) Co-authored-by: vinnerzhang <vinnerzh...@tencent.com> --- .../sort/redis/source/RedisDynamicTableSource.java | 22 ++++++++++++++++++---- .../redis/source/RedisRowDataLookupFunction.java | 18 +++++++++++++++++- .../sort/redis/table/RedisDynamicTableFactory.java | 7 ++++++- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java index 7f28fccdb5..277045eac2 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java @@ -37,7 +37,11 @@ import org.apache.flink.util.Preconditions; import java.util.Map; -import static org.apache.flink.table.types.logical.LogicalTypeRoot.*; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; + +//import static org.apache.flink.table.types.logical.LogicalTypeRoot.*; /** * Redis dynamic table source @@ -52,8 +56,13 @@ public class RedisDynamicTableSource implements LookupTableSource { private final RedisLookupOptions redisLookupOptions; private final Map<String, String> properties; + private final String inlongMetric; + private final String auditHostAndPorts; + private final String auditKeys; + public RedisDynamicTableSource(Map<String, String> properties, ResolvedSchema tableSchema, - ReadableConfig config, RedisLookupOptions redisLookupOptions) { + ReadableConfig config, RedisLookupOptions redisLookupOptions, String inlongMetric, String auditHostAndPorts, + String auditKeys) { this.properties = properties; Preconditions.checkNotNull(properties, "properties should not be null"); this.tableSchema = tableSchema; @@ -73,11 +82,15 @@ public class RedisDynamicTableSource implements LookupTableSource { flinkJedisConfigBase = RedisHandlerServices .findRedisHandler(InlongJedisConfigHandler.class, properties).createFlinkJedisConfig(config); this.redisLookupOptions = redisLookupOptions; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.auditKeys = auditKeys; } @Override public DynamicTableSource copy() { - return new RedisDynamicTableSource(properties, tableSchema, config, redisLookupOptions); + return new RedisDynamicTableSource(properties, tableSchema, config, redisLookupOptions, inlongMetric, + auditHostAndPorts, auditKeys); } @Override @@ -88,6 +101,7 @@ public class RedisDynamicTableSource implements LookupTableSource { @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { return TableFunctionProvider.of(new RedisRowDataLookupFunction( - redisMapper.getCommandDescription(), flinkJedisConfigBase, this.redisLookupOptions)); + redisMapper.getCommandDescription(), flinkJedisConfigBase, this.redisLookupOptions, inlongMetric, + auditHostAndPorts, auditKeys)); } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java index 95c9acd41f..7186d5a603 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.redis.source; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.redis.common.config.RedisLookupOptions; import org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer; import org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder; @@ -55,14 +57,25 @@ public class RedisRowDataLookupFunction extends TableFunction<RowData> { private transient Cache<RowData, RowData> cache; private InlongRedisCommandsContainer redisCommandsContainer; + private SourceMetricData sourceMetricData; + RedisRowDataLookupFunction(RedisCommandDescription redisCommandDescription, - FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions redisLookupOptions) { + FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions redisLookupOptions, String inlongMetric, + String auditHostAndPorts, String auditKeys) { this.flinkJedisConfigBase = flinkJedisConfigBase; this.redisCommand = redisCommandDescription.getCommand(); this.additionalKey = redisCommandDescription.getAdditionalKey(); this.cacheMaxSize = redisLookupOptions.getCacheMaxSize(); this.cacheExpireMs = redisLookupOptions.getCacheExpireMs(); this.maxRetryTimes = redisLookupOptions.getMaxRetryTimes(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption); + } } /** @@ -107,6 +120,9 @@ public class RedisRowDataLookupFunction extends TableFunction<RowData> { throw new UnsupportedOperationException( String.format("Unsupported for redisCommand: %s", redisCommand)); } + if (sourceMetricData != null) { + sourceMetricData.outputMetricsWithEstimate(rowData); + } if (cache == null) { collect(rowData); } else { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java index ef6c778a98..54f907ece9 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java @@ -95,10 +95,15 @@ public class RedisDynamicTableFactory implements DynamicTableSourceFactory, Dyna public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); ReadableConfig config = helper.getOptions(); + helper.validate(); validateConfigOptions(config, SUPPORT_SOURCE_COMMANDS); + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = config.get(INLONG_AUDIT); + String auditKeys = config.get(AUDIT_KEYS); return new RedisDynamicTableSource(context.getCatalogTable().getOptions(), - context.getCatalogTable().getResolvedSchema(), config, getJdbcLookupOptions(config)); + context.getCatalogTable().getResolvedSchema(), config, getJdbcLookupOptions(config), inlongMetric, + auditHostAndPorts, auditKeys); } @Override