This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 77338950f [INLONG-7242][Manager] Support register and manage the resource of Redis sink (#7243) 77338950f is described below commit 77338950f861d947086da231f876f191a6fbb6e1 Author: feat <featzh...@outlook.com> AuthorDate: Thu Feb 2 16:00:27 2023 +0800 [INLONG-7242][Manager] Support register and manage the resource of Redis sink (#7243) --- .../inlong/manager/common/consts/DataNodeType.java | 1 + .../inlong/manager/common/consts/SinkType.java | 1 + .../inlong/manager/common/enums/ErrorCodeEnum.java | 3 +- .../inlong/manager/common/util/Preconditions.java | 13 ++ .../plugin/flink/enums/ConnectorJarType.java | 1 + .../manager/pojo/node/redis/RedisDataNodeDTO.java | 63 ++++++++ .../manager/pojo/node/redis/RedisDataNodeInfo.java | 47 ++++++ .../pojo/node/redis/RedisDataNodeRequest.java} | 30 ++-- .../manager/pojo/sink/redis/RedisClusterMode.java} | 32 +++-- .../manager/pojo/sink/redis/RedisDataType.java} | 34 +++-- .../pojo/sink/redis/RedisSchemaMapMode.java} | 19 +-- .../inlong/manager/pojo/sink/redis/RedisSink.java | 124 ++++++++++++++++ .../manager/pojo/sink/redis/RedisSinkDTO.java | 129 +++++++++++++++++ .../manager/pojo/sink/redis/RedisSinkRequest.java | 108 ++++++++++++++ .../manager/pojo/sort/util/LoadNodeUtils.java | 118 +++++++++++++++ .../service/node/redis/RedisDataNodeOperator.java | 84 +++++++++++ .../resource/sink/redis/RedisResourceOperator.java | 48 +++++++ .../service/sink/redis/RedisSinkOperator.java | 158 +++++++++++++++++++++ .../manager/service/sink/RedisSinkServiceTest.java | 95 +++++++++++++ 19 files changed, 1061 insertions(+), 47 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java index a32adac37..757561d5b 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java @@ -30,5 +30,6 @@ public class DataNodeType { public static final String ELASTICSEARCH = "ELASTICSEARCH"; public static final String MYSQL = "MYSQL"; public static final String STARROCKS = "STARROCKS"; + public static final String REDIS = "REDIS"; } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index a2da09886..c7e293ed6 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -38,4 +38,5 @@ public class SinkType { public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL"; public static final String DORIS = "DORIS"; public static final String STARROCKS = "STARROCKS"; + public static final String REDIS = "REDIS"; } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java index 9c532dddc..d690e3f5d 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java @@ -40,6 +40,8 @@ public enum ErrorCodeEnum { RECORD_NOT_FOUND(109, "The record does not exist"), USER_IS_NOT_MANAGER(110, "%s is not the manager, please contact %s"), RECORD_IN_USED(111, "The record is in use"), + IP_EMPTY(112, "The IP is is empty"), + PORT_EMPTY(113, "The PORT is is empty"), GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"), GROUP_DUPLICATE(1002, "Inlong group already exists"), @@ -143,7 +145,6 @@ public enum ErrorCodeEnum { CONSUME_PERMISSION_DENIED(3005, "No permission to access this inlong consume"), AUDIT_ID_TYPE_NOT_SUPPORTED(4001, "Audit id type '%s' not supported"), - ; private final int code; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java index b0326b18c..ce2ae4485 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java @@ -90,6 +90,14 @@ public class Preconditions { expectTrue(collection != null && !collection.isEmpty(), errMsg); } + public static void expectNotEmpty(String[] collection, String errMsg) { + expectTrue(collection != null && collection.length != 0, errMsg); + } + + public static void expectNotEmpty(String[] collection, Supplier<String> errMsg) { + expectTrue(collection != null && collection.length != 0, errMsg); + } + public static void expectNotEmpty(Map<?, ?> map, String errMsg) { expectTrue(map != null && !map.isEmpty(), errMsg); } @@ -127,6 +135,11 @@ public class Preconditions { throw new IllegalArgumentException(errMsg); } } + public static void expectNotBlank(String obj, String errMsg) { + if (StringUtils.isBlank(obj)) { + throw new IllegalArgumentException(errMsg); + } + } public static void expectNotBlank(String obj, ErrorCodeEnum errorCodeEnum) { if (StringUtils.isBlank(obj)) { diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java index de2259f67..52a581a50 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java @@ -72,6 +72,7 @@ public enum ConnectorJarType { HUDI_SINK("hudiLoad", "hudi"), HDFS_SINK("fileSystemLoad", ""), + REDIS_SINK("redisLoad", "redis"), ; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeDTO.java new file mode 100644 index 000000000..1c6b64cd8 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeDTO.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.redis; + +import io.swagger.annotations.ApiModel; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.constraints.NotNull; + +/** + * Redis data node info + */ +@Data +@Builder +@NoArgsConstructor +@ApiModel("Redis data node info") +public class RedisDataNodeDTO { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisDataNodeDTO.class); + + /** + * Get the dto instance from the request + */ + public static RedisDataNodeDTO getFromRequest(RedisDataNodeRequest request) throws Exception { + return RedisDataNodeDTO.builder() + .build(); + } + + /** + * Get the dto instance from the JSON string. + */ + public static RedisDataNodeDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, RedisDataNodeDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT, + String.format("Failed to parse extParams for Redis node: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java new file mode 100644 index 000000000..3772f67d7 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.redis; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; + +/** + * Redis data node info + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.REDIS) +@ApiModel("Redis data node info") +public class RedisDataNodeInfo extends DataNodeInfo { + + public RedisDataNodeInfo() { + this.setType(DataNodeType.REDIS); + } + + @Override + public RedisDataNodeRequest genRequest() { + return CommonBeanUtils.copyProperties(this, RedisDataNodeRequest::new); + } +} diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java similarity index 54% copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java index a32adac37..5d782c6f0 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java @@ -15,20 +15,28 @@ * limitations under the License. */ -package org.apache.inlong.manager.common.consts; +package org.apache.inlong.manager.pojo.node.redis; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; /** - * Constants of data node. + * Redis data node request */ -public class DataNodeType { +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.REDIS) +@ApiModel("Redis data node request") +public class RedisDataNodeRequest extends DataNodeRequest { - public static final String HIVE = "HIVE"; - public static final String KAFKA = "KAFKA"; - public static final String ICEBERG = "ICEBERG"; - public static final String HUDI = "HUDI"; - public static final String CLICKHOUSE = "CLICKHOUSE"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; - public static final String MYSQL = "MYSQL"; - public static final String STARROCKS = "STARROCKS"; + public RedisDataNodeRequest() { + this.setType(DataNodeType.REDIS); + } } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisClusterMode.java similarity index 57% copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisClusterMode.java index a32adac37..f9d4cd413 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisClusterMode.java @@ -15,20 +15,30 @@ * limitations under the License. */ -package org.apache.inlong.manager.common.consts; +package org.apache.inlong.manager.pojo.sink.redis; /** - * Constants of data node. + * The cluster mode of Redis. */ -public class DataNodeType { +public enum RedisClusterMode { - public static final String HIVE = "HIVE"; - public static final String KAFKA = "KAFKA"; - public static final String ICEBERG = "ICEBERG"; - public static final String HUDI = "HUDI"; - public static final String CLICKHOUSE = "CLICKHOUSE"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; - public static final String MYSQL = "MYSQL"; - public static final String STARROCKS = "STARROCKS"; + STANDALONE("standalone"), + CLUSTER("cluster"), + SENTINEL("sentinel"), + ; + private final String key; + + private RedisClusterMode(String key) { + this.key = key; + } + + public static RedisClusterMode of(String key) { + for (RedisClusterMode redisClusterMode : RedisClusterMode.values()) { + if (key != null && redisClusterMode.key.equals(key.toLowerCase())) { + return redisClusterMode; + } + } + return null; + } } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisDataType.java similarity index 53% copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisDataType.java index a32adac37..f74c92b56 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisDataType.java @@ -15,20 +15,32 @@ * limitations under the License. */ -package org.apache.inlong.manager.common.consts; +package org.apache.inlong.manager.pojo.sink.redis; + +import java.util.Arrays; +import java.util.HashSet; /** - * Constants of data node. + * The data type of Redis. */ -public class DataNodeType { +public enum RedisDataType { + + HASH( + RedisSchemaMapMode.DYNAMIC, + RedisSchemaMapMode.STATIC_KV_PAIR, + RedisSchemaMapMode.STATIC_PREFIX_MATCH), + BITMAP( + RedisSchemaMapMode.DYNAMIC), + PLAIN( + RedisSchemaMapMode.STATIC_PREFIX_MATCH); + + private final HashSet<RedisSchemaMapMode> mapModes; - public static final String HIVE = "HIVE"; - public static final String KAFKA = "KAFKA"; - public static final String ICEBERG = "ICEBERG"; - public static final String HUDI = "HUDI"; - public static final String CLICKHOUSE = "CLICKHOUSE"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; - public static final String MYSQL = "MYSQL"; - public static final String STARROCKS = "STARROCKS"; + private RedisDataType(RedisSchemaMapMode... modes) { + this.mapModes = new HashSet<>(Arrays.asList(modes)); + } + public HashSet<RedisSchemaMapMode> getMapModes() { + return mapModes; + } } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSchemaMapMode.java similarity index 60% copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSchemaMapMode.java index a32adac37..b449fd4f7 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSchemaMapMode.java @@ -15,20 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.manager.common.consts; +package org.apache.inlong.manager.pojo.sink.redis; /** - * Constants of data node. + * The mapMode between SQL column and Redis data-type. */ -public class DataNodeType { - - public static final String HIVE = "HIVE"; - public static final String KAFKA = "KAFKA"; - public static final String ICEBERG = "ICEBERG"; - public static final String HUDI = "HUDI"; - public static final String CLICKHOUSE = "CLICKHOUSE"; - public static final String ELASTICSEARCH = "ELASTICSEARCH"; - public static final String MYSQL = "MYSQL"; - public static final String STARROCKS = "STARROCKS"; - +public enum RedisSchemaMapMode { + DYNAMIC, + STATIC_PREFIX_MATCH, + STATIC_KV_PAIR; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java new file mode 100644 index 000000000..ff19f870f --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.redis; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; + +/** + * Redis sink info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "Redis sink info") +@JsonTypeDefine(value = SinkType.REDIS) +public class RedisSink extends StreamSink { + + @ApiModelProperty("Redis cluster mode") + private String clusterMode; + + @ApiModelProperty("Redis database id") + private Integer database; + + @ApiModelProperty("Redis data type") + private String dataType; + + @ApiModelProperty("Redis schema mapping mode") + private String schemaMapMode; + + @ApiModelProperty("Password for Redis accessing") + private String password; + + @ApiModelProperty("Database name") + private String databaseName; + + @ApiModelProperty("Expire time of Redis row") + private Integer ttl; + + @ApiModelProperty("The timeout of Redis client") + private Integer timeout; + + @ApiModelProperty("The socket timeout of redis client") + private Integer soTimeout; + + @ApiModelProperty("The max total of sink client") + private Integer maxTotal; + + @ApiModelProperty("The max idle of sink client") + private Integer maxIdle; + + @ApiModelProperty("The min idle of sink client") + private Integer minIdle; + + @ApiModelProperty("The max retry time") + private Integer maxRetries; + + @ApiModelProperty("The host of Redis server") + private String host; + + @ApiModelProperty("The port of Redis server") + private Integer port; + + @ApiModelProperty("The master name of Redis sentinel cluster") + private String sentinelMasterName; + + @ApiModelProperty("The sentinels info of Redis sentinel cluster") + private String sentinelsInfo; + + /** + * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 . + * If server is not cluster mode, server address format eg: 127.0.0.1:8080 . + */ + @ApiModelProperty("The cluster nodes of Redis cluster") + private String clusterNodes; + + @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH data-type") + private String formatDataEncoding; + + @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type") + private String formatDataType; + + @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH data-type") + private Boolean formatIgnoreParseError; + + @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH data-type") + private String formatDataSeparator; + + public RedisSink() { + this.setSinkType(SinkType.REDIS); + } + + @Override + public SinkRequest genSinkRequest() { + return CommonBeanUtils.copyProperties(this, RedisSinkRequest::new); + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java new file mode 100644 index 000000000..22888179d --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkDTO.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.redis; + +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import javax.validation.constraints.NotNull; +import java.util.Map; + +/** + * Sink info of Redis + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class RedisSinkDTO { + + @ApiModelProperty("Redis cluster mode") + private String clusterMode; + + @ApiModelProperty("Redis database id") + private Integer database; + + @ApiModelProperty("Redis data type") + private String dataType; + + @ApiModelProperty("Redis schema mapping mode") + private String schemaMapMode; + + @ApiModelProperty("Password for Redis accessing") + private String password; + + @ApiModelProperty("Database name") + private String databaseName; + + @ApiModelProperty("Expire time of Redis row") + private Integer ttl; + + @ApiModelProperty("The timeout of Redis client") + private Integer timeout; + + @ApiModelProperty("The socket timeout of redis client") + private Integer soTimeout; + + @ApiModelProperty("The max total of sink client") + private Integer maxTotal; + + @ApiModelProperty("The max idle of sink client") + private Integer maxIdle; + + @ApiModelProperty("The min idle of sink client") + private Integer minIdle; + + @ApiModelProperty("The max retry time") + private Integer maxRetries; + + @ApiModelProperty("The host of Redis server") + private String host; + + @ApiModelProperty("The port of Redis server") + private Integer port; + + @ApiModelProperty("The master name of Redis sentinel cluster") + private String sentinelMasterName; + + private String sentinelsInfo; + + /** + * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 . + * If server is not cluster mode, server address format eg: 127.0.0.1:8080 . + */ + @ApiModelProperty("The cluster nodes of Redis cluster") + private String clusterNodes; + + @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH data-type") + private String formatDataEncoding; + + @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type") + private String formatDataType; + + @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH data-type") + private Boolean formatIgnoreParseError; + + @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH data-type") + private String formatDataSeparator; + @ApiModelProperty("Properties for Redis") + private Map<String, Object> properties; + + /** + * Get the dto instance from the request + */ + public static RedisSinkDTO getFromRequest(RedisSinkRequest request) throws Exception { + return CommonBeanUtils.copyProperties(request, RedisSinkDTO::new); + } + + public static RedisSinkDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, RedisSinkDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, + String.format("parse extParams of Redis SinkDTO failure: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkRequest.java new file mode 100644 index 000000000..0ea2e84c9 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSinkRequest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.redis; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; + +/** + * Redis sink request. + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "Redis sink request") +@JsonTypeDefine(value = SinkType.REDIS) +public class RedisSinkRequest extends SinkRequest { + + @ApiModelProperty("Redis cluster mode") + private String clusterMode; + + @ApiModelProperty("Redis database id") + private Integer database; + + @ApiModelProperty("Redis data type") + private String dataType; + + @ApiModelProperty("Redis schema mapping mode") + private String schemaMapMode; + + @ApiModelProperty("Password for Redis accessing") + private String password; + + @ApiModelProperty("Database name") + private String databaseName; + + @ApiModelProperty("Expire time of Redis row") + private Integer ttl; + + @ApiModelProperty("The timeout of Redis client") + private Integer timeout; + + @ApiModelProperty("The socket timeout of redis client") + private Integer soTimeout; + + @ApiModelProperty("The max total of sink client") + private Integer maxTotal; + + @ApiModelProperty("The max idle of sink client") + private Integer maxIdle; + + @ApiModelProperty("The min idle of sink client") + private Integer minIdle; + + @ApiModelProperty("The max retry time") + private Integer maxRetries; + + @ApiModelProperty("The host of Redis server") + private String host; + + @ApiModelProperty("The port of Redis server") + private Integer port; + + @ApiModelProperty("The master name of Redis sentinel cluster") + private String sentinelMasterName; + + @ApiModelProperty("The sentinels info of Redis sentinel cluster") + private String sentinelsInfo; + + /** + * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 . + * If server is not cluster mode, server address format eg: 127.0.0.1:8080 . + */ + @ApiModelProperty("The cluster nodes of Redis cluster") + private String clusterNodes; + + @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH data-type") + private String formatDataEncoding; + + @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type") + private String formatDataType; + + @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH data-type") + private Boolean formatIgnoreParseError; + + @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH data-type") + private String formatDataSeparator; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java index 1eb3e1398..2c49597c6 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java @@ -41,6 +41,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink; import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink; import org.apache.inlong.manager.pojo.sink.oracle.OracleSink; import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink; +import org.apache.inlong.manager.pojo.sink.redis.RedisSink; import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSink; import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink; import org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink; @@ -55,6 +56,7 @@ import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; import org.apache.inlong.sort.protocol.node.format.CsvFormat; import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat; import org.apache.inlong.sort.protocol.node.format.Format; +import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.node.format.JsonFormat; import org.apache.inlong.sort.protocol.node.format.RawFormat; import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode; @@ -70,6 +72,7 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode; import org.apache.inlong.sort.protocol.node.load.OracleLoadNode; import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode; +import org.apache.inlong.sort.protocol.node.load.RedisLoadNode; import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode; import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode; import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode; @@ -145,6 +148,8 @@ public class LoadNodeUtils { return createLoadNode((DorisSink) streamSink, fieldInfos, fieldRelations, properties); case SinkType.STARROCKS: return createLoadNode((StarRocksSink) streamSink, fieldInfos, fieldRelations, properties); + case SinkType.REDIS: + return createLoadNode((RedisSink) streamSink, fieldInfos, fieldRelations, properties); default: throw new BusinessException(String.format("Unsupported sinkType=%s to create load node", sinkType)); } @@ -380,6 +385,64 @@ public class LoadNodeUtils { starRocksSink.getTablePattern()); } + private static LoadNode createLoadNode( + RedisSink redisSink, + List<FieldInfo> fieldInfos, + List<FieldRelation> fieldRelations, + Map<String, String> properties) { + String clusterMode = redisSink.getClusterMode(); + String dataType = redisSink.getDataType(); + String schemaMapMode = redisSink.getSchemaMapMode(); + String host = redisSink.getHost(); + Integer port = redisSink.getPort(); + String clusterNodes = redisSink.getClusterNodes(); + String sentinelMasterName = redisSink.getSentinelMasterName(); + String sentinelsInfo = redisSink.getSentinelsInfo(); + Integer database = redisSink.getDatabase(); + String password = redisSink.getPassword(); + Integer ttl = redisSink.getTtl(); + Integer timeout = redisSink.getTimeout(); + Integer soTimeout = redisSink.getSoTimeout(); + Integer maxTotal = redisSink.getMaxTotal(); + Integer maxIdle = redisSink.getMaxIdle(); + Integer minIdle = redisSink.getMinIdle(); + Integer maxRetries = redisSink.getMaxRetries(); + + Format format = parsingFormat( + redisSink.getFormatDataType(), + false, + redisSink.getFormatDataSeparator(), + false); + + return new RedisLoadNode( + redisSink.getSinkName(), + redisSink.getSinkName(), + fieldInfos, + fieldRelations, + null, + null, + null, + properties, + clusterMode, + dataType, + schemaMapMode, + host, + port, + clusterNodes, + sentinelMasterName, + sentinelsInfo, + database, + password, + ttl, + format, + timeout, + soTimeout, + maxTotal, + maxIdle, + minIdle, + maxRetries); + } + /** * Create load node of Iceberg. */ @@ -651,4 +714,59 @@ public class LoadNodeUtils { } } + /** + * Parse format + * + * @param formatName data serialization, support: csv, json, canal, avro, etc + * @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat} + * @param separatorStr the separator of data content + * @param ignoreParseErrors whether ignore deserialization error data + * @return the format for serialized content + */ + private static Format parsingFormat( + String formatName, + boolean wrapWithInlongMsg, + String separatorStr, + boolean ignoreParseErrors) { + Format format; + DataTypeEnum dataType = DataTypeEnum.forType(formatName); + switch (dataType) { + case CSV: + if (StringUtils.isNumeric(separatorStr)) { + char dataSeparator = (char) Integer.parseInt(separatorStr); + separatorStr = Character.toString(dataSeparator); + } + CsvFormat csvFormat = new CsvFormat(separatorStr); + csvFormat.setIgnoreParseErrors(ignoreParseErrors); + format = csvFormat; + break; + case AVRO: + format = new AvroFormat(); + break; + case JSON: + JsonFormat jsonFormat = new JsonFormat(); + jsonFormat.setIgnoreParseErrors(ignoreParseErrors); + format = jsonFormat; + break; + case CANAL: + format = new CanalJsonFormat(); + break; + case DEBEZIUM_JSON: + DebeziumJsonFormat debeziumJsonFormat = new DebeziumJsonFormat(); + debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors); + format = debeziumJsonFormat; + break; + case RAW: + format = new RawFormat(); + break; + default: + throw new IllegalArgumentException(String.format("Unsupported dataType=%s", dataType)); + } + if (wrapWithInlongMsg) { + Format innerFormat = format; + format = new InLongMsgFormat(innerFormat, false); + } + return format; + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/redis/RedisDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/redis/RedisDataNodeOperator.java new file mode 100644 index 000000000..94930b4cd --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/redis/RedisDataNodeOperator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node.redis; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; +import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeDTO; +import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeInfo; +import org.apache.inlong.manager.pojo.node.redis.RedisDataNodeRequest; +import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class RedisDataNodeOperator extends AbstractDataNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisDataNodeOperator.class); + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String dataNodeType) { + return getDataNodeType().equals(dataNodeType); + } + + @Override + public String getDataNodeType() { + return DataNodeType.REDIS; + } + + @Override + public DataNodeInfo getFromEntity(DataNodeEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND); + } + + RedisDataNodeInfo redisDataNodeInfo = new RedisDataNodeInfo(); + CommonBeanUtils.copyProperties(entity, redisDataNodeInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + RedisDataNodeDTO dto = RedisDataNodeDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, redisDataNodeInfo); + } + return redisDataNodeInfo; + } + + @Override + protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { + RedisDataNodeRequest redisDataNodeRequest = (RedisDataNodeRequest) request; + CommonBeanUtils.copyProperties(redisDataNodeRequest, targetEntity, true); + try { + RedisDataNodeDTO dto = RedisDataNodeDTO.getFromRequest(redisDataNodeRequest); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("Failed to build extParams for Redis node: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java new file mode 100644 index 000000000..264d8bf1d --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.redis; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.pojo.sink.SinkInfo; +import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +/** + * Redis resource operator + */ +@Service +public class RedisResourceOperator implements SinkResourceOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisResourceOperator.class); + + @Override + public Boolean accept(String sinkType) { + return SinkType.REDIS.equals(sinkType); + } + + /** + * Create Redis table according to the sink config + */ + @Override + public void createSinkResource(SinkInfo sinkInfo) { + LOGGER.info("It is not need to create redis table!"); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java new file mode 100644 index 000000000..884fc64fd --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.sink.redis; + +import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY; +import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY; +import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED; +import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT; +import static org.apache.inlong.manager.common.util.Preconditions.expectNotBlank; +import static org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty; +import static org.apache.inlong.manager.common.util.Preconditions.expectNotNull; +import static org.apache.inlong.manager.common.util.Preconditions.expectTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.pojo.sink.SinkField; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode; +import org.apache.inlong.manager.pojo.sink.redis.RedisDataType; +import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode; +import org.apache.inlong.manager.pojo.sink.redis.RedisSink; +import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO; +import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest; +import org.apache.inlong.manager.service.sink.AbstractSinkOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Arrays; +import java.util.List; + +/** + * Redis sink operator, such as save or update redis field, etc. + */ +@Service +public class RedisSinkOperator extends AbstractSinkOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class); + private static final int PORT_MAX_VALUE = 65535; + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String sinkType) { + return SinkType.REDIS.equals(sinkType); + } + + @Override + protected String getSinkType() { + return SinkType.REDIS; + } + + @Override + protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) { + + if (!this.getSinkType().equals(request.getSinkType())) { + throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, + SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType()); + } + + RedisSinkRequest sinkRequest = (RedisSinkRequest) request; + + String clusterMode = sinkRequest.getClusterMode(); + RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode); + + expectNotNull(redisClusterMode, + "Redis ClusterMode must in one of " + Arrays.toString(RedisClusterMode.values()) + " !"); + + switch (redisClusterMode) { + case CLUSTER: + String clusterNodes = sinkRequest.getClusterNodes(); + checkClusterNodes(clusterNodes); + break; + case SENTINEL: + String sentinelMasterName = sinkRequest.getSentinelMasterName(); + expectNotEmpty(sentinelMasterName, "Redis MasterName of Sentinel cluster must not null!"); + String sentinelsInfo = sinkRequest.getSentinelsInfo(); + expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel cluster must not null!"); + break; + case STANDALONE: + String host = sinkRequest.getHost(); + Integer port = sinkRequest.getPort(); + + expectNotEmpty(host, "Redis server host must not null!"); + expectTrue( + port != null && port > 1 && port < PORT_MAX_VALUE, + "The port of the redis server must be greater than 0 and less than 65535!"); + break; + } + RedisDataType dataType = RedisDataType.valueOf(sinkRequest.getDataType()); + expectNotNull(dataType, "Redis DataType must not null"); + + RedisSchemaMapMode mapMode = RedisSchemaMapMode.valueOf(sinkRequest.getSchemaMapMode()); + expectTrue(dataType.getMapModes().contains(mapMode), + "Redis schemaMapMode '" + mapMode + "' is not supported in '" + dataType + "'"); + + try { + RedisSinkDTO dto = RedisSinkDTO.getFromRequest(sinkRequest); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(SINK_SAVE_FAILED, + String.format("serialize extParams of Redis SinkDTO failure: %s", e.getMessage())); + } + } + + private void checkClusterNodes(String clusterNodes) { + expectNotBlank(clusterNodes, "the nodes of Redis cluster must not null"); + String[] nodeArray = clusterNodes.split(","); + expectNotEmpty(nodeArray, "the nodes of Redis cluster must not null"); + + for (String node : nodeArray) { + expectNotBlank(node, "Redis server host must not null!"); + String[] ipPort = node.split(":"); + expectTrue(ipPort.length == 2, "The ip and port of Redis server must be in form: ip:port"); + expectNotBlank(ipPort[0], IP_EMPTY); + expectNotBlank(ipPort[1], PORT_EMPTY); + } + } + + @Override + public StreamSink getFromEntity(StreamSinkEntity entity) { + RedisSink sink = new RedisSink(); + if (entity == null) { + return sink; + } + + RedisSinkDTO dto = RedisSinkDTO.getFromJson(entity.getExtParams()); + + CommonBeanUtils.copyProperties(entity, sink, true); + CommonBeanUtils.copyProperties(dto, sink, true); + List<SinkField> sinkFields = super.getSinkFields(entity.getId()); + sink.setSinkFieldList(sinkFields); + return sink; + } + +} diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/RedisSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/RedisSinkServiceTest.java new file mode 100644 index 000000000..9f16c7370 --- /dev/null +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/RedisSinkServiceTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.sink; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode; +import org.apache.inlong.manager.pojo.sink.redis.RedisSink; +import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest; +import org.apache.inlong.manager.service.ServiceBaseTest; +import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * Redis stream sink service test. + */ +public class RedisSinkServiceTest extends ServiceBaseTest { + + private final String globalGroupId = "b_group1"; + private final String globalStreamId = "stream1_hudi"; + private final String globalOperator = "admin"; + + @Autowired + private StreamSinkService sinkService; + @Autowired + private InlongStreamServiceTest streamServiceTest; + + /** + * Save sink info. + */ + public Integer saveSink(String sinkName) { + streamServiceTest.saveInlongStream(globalGroupId, globalStreamId, globalOperator); + RedisSinkRequest sinkRequest = new RedisSinkRequest(); + sinkRequest.setInlongGroupId(globalGroupId); + sinkRequest.setInlongStreamId(globalStreamId); + sinkRequest.setSinkType(SinkType.REDIS); + sinkRequest.setClusterMode(RedisClusterMode.STANDALONE.name()); + sinkRequest.setHost("demo-host"); + sinkRequest.setPort(6300); + sinkRequest.setDataType("HASH"); + sinkRequest.setSchemaMapMode("DYNAMIC"); + sinkRequest.setSinkName(sinkName); + sinkRequest.setId((int) (Math.random() * 100000 + 1)); + return sinkService.save(sinkRequest, globalOperator); + } + + @Test + public void testSaveAndDelete() { + Integer id = this.saveSink("default1"); + Assertions.assertNotNull(id); + boolean result = sinkService.delete(id, false, globalOperator); + Assertions.assertTrue(result); + } + + @Test + public void testListByIdentifier() { + Integer id = this.saveSink("default2"); + StreamSink sink = sinkService.get(id); + Assertions.assertEquals(globalGroupId, sink.getInlongGroupId()); + sinkService.delete(id, false, globalOperator); + } + + @Test + public void testGetAndUpdate() { + Integer sinkId = this.saveSink("default3"); + StreamSink streamSink = sinkService.get(sinkId); + Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId()); + + RedisSink sink = (RedisSink) streamSink; + SinkRequest request = sink.genSinkRequest(); + boolean result = sinkService.update(request, globalOperator); + Assertions.assertTrue(result); + + sinkService.delete(sinkId, false, globalOperator); + } + +}