This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6572cf9f8 [INLONG-7245][Sort] Support metric and audit in sort-connect-redis (#7309) 6572cf9f8 is described below commit 6572cf9f8b0913b062a80fa14caa68d0d0716652 Author: feat <featzh...@outlook.com> AuthorDate: Thu Feb 2 16:55:04 2023 +0800 [INLONG-7245][Sort] Support metric and audit in sort-connect-redis (#7309) --- .../sort/redis/sink/AbstractRedisSinkFunction.java | 69 ++++++++++++++++++---- .../sort/redis/sink/RedisBitmapSinkFunction.java | 8 ++- .../sort/redis/sink/RedisDynamicTableSink.java | 27 +++++++-- .../sort/redis/sink/RedisHashSinkFunction.java | 9 ++- .../sort/redis/sink/RedisPlainSinkFunction.java | 8 ++- .../inlong/sort/redis/sink/RedisSinkFunction.java | 8 ++- .../sort/redis/table/RedisDynamicTableFactory.java | 4 +- 7 files changed, 106 insertions(+), 27 deletions(-) diff --git a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java index 2829198ea..97ae9b9f2 100644 --- a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java +++ b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java @@ -18,17 +18,22 @@ package org.apache.inlong.sort.redis.sink; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; import javax.annotation.concurrent.GuardedBy; import org.apache.commons.lang3.time.StopWatch; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -37,6 +42,10 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.table.data.RowData; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer; import org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder; import org.apache.inlong.sort.redis.common.schema.StateEncoder; @@ -90,8 +99,6 @@ public abstract class AbstractRedisSinkFunction<OUT> private final List<OUT> rows; - private transient ScheduledExecutorService executorService; - /** * The container for all available Redis commands. */ @@ -104,6 +111,12 @@ public abstract class AbstractRedisSinkFunction<OUT> protected transient StopWatch stopWatch; protected StateEncoder<OUT> stateEncoder; + private final String auditHostAndPorts; + + private final String inLongMetric; + private transient MetricState metricState; + private transient ListState<MetricState> metricStateListState; + private SinkMetricData sinkMetricData; public AbstractRedisSinkFunction( TypeInformation<OUT> outputType, @@ -112,7 +125,9 @@ public abstract class AbstractRedisSinkFunction<OUT> long batchSize, Duration flushInterval, Duration configuration, - FlinkJedisConfigBase flinkJedisConfigBase) { + FlinkJedisConfigBase flinkJedisConfigBase, + String inLongMetric, + String auditHostAndPorts) { checkNotNull(configuration, "The configuration must not be null."); this.stateEncoder = stateEncoder; @@ -124,6 +139,8 @@ public abstract class AbstractRedisSinkFunction<OUT> this.forceFlush = false; this.rows = new ArrayList<>(); this.flinkJedisConfigBase = flinkJedisConfigBase; + this.inLongMetric = inLongMetric; + this.auditHostAndPorts = auditHostAndPorts; } @Override @@ -152,11 +169,34 @@ public abstract class AbstractRedisSinkFunction<OUT> outputFlusher = Optional.of(new OutputFlusher(threadName, flushIntervalInMillis)); outputFlusher.get().start(); } - + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inLongMetric) + .withInlongAudit(auditHostAndPorts) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) + .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) + .withRegisterMetric(MetricOption.RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { + if (this.inLongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + final ListStateDescriptor<OUT> stateDescriptor = new ListStateDescriptor<>( "rowState", outputType); this.listState = context.getOperatorStateStore().getListState(stateDescriptor); @@ -174,6 +214,10 @@ public abstract class AbstractRedisSinkFunction<OUT> listState.clear(); listState.addAll(rows); } + if (sinkMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } LOG.info("redis end snapshotState, id: {}", functionSnapshotContext.getCheckpointId()); } @@ -189,6 +233,7 @@ public abstract class AbstractRedisSinkFunction<OUT> public void invoke(RowData in, Context context) { List<OUT> redisOutputs = serialize(in); + sendMetrics(in.toString().getBytes()); synchronized (lock) { rows.addAll(redisOutputs); if (forceFlush || rows.size() >= batchSize) { @@ -202,14 +247,6 @@ public abstract class AbstractRedisSinkFunction<OUT> public void close() throws Exception { closeClient(); - if (executorService != null) { - try { - executorService.shutdown(); - } catch (Throwable t) { - LOG.warn("Could not properly shut down ScheduledExecutorService.", t); - } - } - super.close(); LOG.info("Closed redis sink."); @@ -289,4 +326,10 @@ public abstract class AbstractRedisSinkFunction<OUT> } } } + + protected void sendMetrics(byte[] document) { + if (sinkMetricData != null) { + sinkMetricData.invoke(1, document.length); + } + } } diff --git a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java index 4e59aa252..d68ff3d28 100644 --- a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java +++ b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java @@ -45,7 +45,9 @@ public class RedisBitmapSinkFunction long batchSize, Duration flushInterval, Duration configuration, - FlinkJedisConfigBase flinkJedisConfigBase) { + FlinkJedisConfigBase flinkJedisConfigBase, + String inlongMetric, + String auditHostAndPorts) { super(TypeInformation.of(new TypeHint<Tuple4<Boolean, String, Long, Boolean>>() { }), serializationSchema, @@ -53,7 +55,9 @@ public class RedisBitmapSinkFunction batchSize, flushInterval, configuration, - flinkJedisConfigBase); + flinkJedisConfigBase, + inlongMetric, + auditHostAndPorts); LOG.info("Creating RedisBitmapStaticKvPairSinkFunction ..."); } diff --git a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java index df16979f2..2e96e3c1d 100644 --- a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java +++ b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java @@ -59,13 +59,18 @@ public class RedisDynamicTableSink implements DynamicTableSink { private final ReadableConfig config; private final Map<String, String> properties; + private final String inlongMetric; + private final String auditHostAndPorts; + public RedisDynamicTableSink( EncodingFormat<SerializationSchema<RowData>> format, ResolvedSchema resolvedSchema, RedisDataType dataType, SchemaMappingMode schemaMappingMode, ReadableConfig config, - Map<String, String> properties) { + Map<String, String> properties, + String inlongMetric, + String auditHostAndPorts) { this.format = format; this.resolvedSchema = resolvedSchema; this.dataType = dataType; @@ -73,6 +78,9 @@ public class RedisDynamicTableSink implements DynamicTableSink { this.config = config; this.properties = properties; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + flinkJedisConfigBase = RedisHandlerServices .findRedisHandler(InlongJedisConfigHandler.class, properties) .createFlinkJedisConfig(config); @@ -117,7 +125,9 @@ public class RedisDynamicTableSink implements DynamicTableSink { batchSize, flushInterval, expireTime, - flinkJedisConfigBase); + flinkJedisConfigBase, + inlongMetric, + auditHostAndPorts); break; case PLAIN: redisSinkFunction = new RedisPlainSinkFunction( @@ -126,7 +136,9 @@ public class RedisDynamicTableSink implements DynamicTableSink { batchSize, flushInterval, expireTime, - flinkJedisConfigBase); + flinkJedisConfigBase, + inlongMetric, + auditHostAndPorts); break; case BITMAP: redisSinkFunction = new RedisBitmapSinkFunction( @@ -135,7 +147,10 @@ public class RedisDynamicTableSink implements DynamicTableSink { batchSize, flushInterval, expireTime, - flinkJedisConfigBase); + flinkJedisConfigBase, + + inlongMetric, + auditHostAndPorts); break; default: throw new UnsupportedOperationException(); @@ -178,7 +193,9 @@ public class RedisDynamicTableSink implements DynamicTableSink { dataType, mappingMode, config, - properties); + properties, + inlongMetric, + auditHostAndPorts); } @Override diff --git a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java index 7a2c25a9c..0874ac2f5 100644 --- a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java +++ b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java @@ -45,7 +45,9 @@ public class RedisHashSinkFunction long batchSize, Duration flushInterval, Duration expireTime, - FlinkJedisConfigBase flinkJedisConfigBase) { + FlinkJedisConfigBase flinkJedisConfigBase, + String inlongMetric, + String auditHostAndPorts) { super(TypeInformation.of(new TypeHint<Tuple4<Boolean, String, String, String>>() { }), serializationSchema, @@ -53,7 +55,10 @@ public class RedisHashSinkFunction batchSize, flushInterval, expireTime, - flinkJedisConfigBase); + flinkJedisConfigBase, + + inlongMetric, + auditHostAndPorts); LOG.info("Creating RedisHashSinkFunction ..."); } diff --git a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java index e8fc817bd..6163b866f 100644 --- a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java +++ b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java @@ -45,7 +45,9 @@ public class RedisPlainSinkFunction long batchSize, Duration flushInterval, Duration configuration, - FlinkJedisConfigBase flinkJedisConfigBase) { + FlinkJedisConfigBase flinkJedisConfigBase, + String inlongMetric, + String auditHostAndPorts) { super(TypeInformation.of(new TypeHint<Tuple3<Boolean, String, String>>() { }), serializationSchema, @@ -53,7 +55,9 @@ public class RedisPlainSinkFunction batchSize, flushInterval, configuration, - flinkJedisConfigBase); + flinkJedisConfigBase, + inlongMetric, + auditHostAndPorts); checkNotNull(serializationSchema, "The serialization schema must not be null."); ensureSerializable(serializationSchema); } diff --git a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java index 232f81486..0f5f3e489 100644 --- a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java +++ b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java @@ -45,7 +45,9 @@ public class RedisSinkFunction long batchSize, Duration flushInterval, Duration configuration, - FlinkJedisConfigBase flinkJedisConfigBase) { + FlinkJedisConfigBase flinkJedisConfigBase, + String inlongMetric, + String auditHostAndPorts) { super(TypeInformation.of(new TypeHint<Tuple3<Boolean, String, String>>() { }), serializationSchema, @@ -53,7 +55,9 @@ public class RedisSinkFunction batchSize, flushInterval, configuration, - flinkJedisConfigBase); + flinkJedisConfigBase, + inlongMetric, + auditHostAndPorts); checkNotNull(serializationSchema, "The serialization schema must not be null."); ensureSerializable(serializationSchema); } diff --git a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java index 21657c69e..8edf979e9 100644 --- a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java @@ -128,7 +128,9 @@ public class RedisDynamicTableFactory implements DynamicTableSourceFactory, Dyna dataType, schemaMappingMode, config, - properties); + properties, + inlongMetric, + auditHostAndPorts); } private RedisLookupOptions getJdbcLookupOptions(ReadableConfig readableConfig) {