This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 88b3c0695 [INLONG-5863][Manager] Extend Redis extract node (#5864) 88b3c0695 is described below commit 88b3c0695b0038cb0101fb1b21b00f9b4e73d99f Author: iamsee123 <61189316+iamsee...@users.noreply.github.com> AuthorDate: Mon Sep 19 11:39:34 2022 +0800 [INLONG-5863][Manager] Extend Redis extract node (#5864) --- .../apache/inlong/common/enums/TaskTypeEnum.java | 3 +- .../inlong/manager/common/consts/SourceType.java | 2 + .../manager/pojo/sort/util/ExtractNodeUtils.java | 45 +++++++ .../manager/pojo/source/redis/RedisSource.java | 118 ++++++++++++++++ .../manager/pojo/source/redis/RedisSourceDTO.java | 149 +++++++++++++++++++++ .../pojo/source/redis/RedisSourceRequest.java | 106 +++++++++++++++ .../service/source/redis/RedisSourceOperator.java | 84 ++++++++++++ .../service/source/RedisSourceServiceTest.java | 99 ++++++++++++++ 8 files changed, 605 insertions(+), 1 deletion(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java index 0c94486ea..fe7f09886 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java @@ -29,7 +29,8 @@ public enum TaskTypeEnum { ORACLE(7), SQLSERVER(8), MONGODB(9), - TUBEMQ(10) + TUBEMQ(10), + REDIS(11), ; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java index 350a074d1..93ab42009 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java @@ -39,6 +39,7 @@ public class SourceType { public static final String ORACLE = "ORACLE"; public static final String SQLSERVER = "SQLSERVER"; public static final String MONGODB = "MONGODB"; + public static final String REDIS = "REDIS"; public static final Map<String, TaskTypeEnum> SOURCE_TASK_MAP = new HashMap<String, TaskTypeEnum>() { { @@ -54,6 +55,7 @@ public class SourceType { put(ORACLE, TaskTypeEnum.ORACLE); put(SQLSERVER, TaskTypeEnum.SQLSERVER); put(MONGODB, TaskTypeEnum.MONGODB); + put(REDIS,TaskTypeEnum.REDIS); } }; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java index b334eca18..d299f8ad7 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java @@ -32,13 +32,17 @@ import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource; import org.apache.inlong.manager.pojo.source.oracle.OracleSource; import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource; import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource; +import org.apache.inlong.manager.pojo.source.redis.RedisSource; import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource; import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.LookupOptions; import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode; import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode; import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode; +import org.apache.inlong.sort.protocol.enums.RedisCommand; +import org.apache.inlong.sort.protocol.enums.RedisMode; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode; @@ -46,6 +50,7 @@ import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode; import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode; import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode; +import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode; import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode; import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode; import org.apache.inlong.sort.protocol.node.format.AvroFormat; @@ -97,6 +102,8 @@ public class ExtractNodeUtils { return createExtractNode((MongoDBSource) sourceInfo); case SourceType.TUBEMQ: return createExtractNode((TubeMQSource) sourceInfo); + case SourceType.REDIS: + return createExtractNode((RedisSource) sourceInfo); default: throw new IllegalArgumentException( String.format("Unsupported sourceType=%s to create extractNode", sourceType)); @@ -395,6 +402,44 @@ public class ExtractNodeUtils { ); } + /** + * Create Redis extract node + * @param source redis source info + * @return redis extract source info + */ + public static RedisExtractNode createExtractNode(RedisSource source) { + List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName()); + Map<String, String> properties = parseProperties(source.getProperties()); + RedisCommand command = RedisCommand.forName(source.getRedisCommand()); + RedisMode mode = RedisMode.forName(source.getRedisMode()); + LookupOptions lookupOptions = new LookupOptions(source.getLookupCacheMaxRows(),source.getLookupCacheTtl(), + source.getLookupMaxRetries(),source.getLookupAsync()); + return new RedisExtractNode( + source.getSourceName(), + source.getSourceName(), + fieldInfos, + null, + properties, + source.getPrimaryKey(), + mode, + command, + source.getClusterNodes(), + source.getMasterName(), + source.getSentinelsInfo(), + source.getHostname(), + source.getPort(), + source.getPassword(), + source.getAdditionalKey(), + source.getDatabase(), + source.getTimeout(), + source.getSoTimeout(), + source.getMaxTotal(), + source.getMaxIdle(), + source.getMinIdle(), + lookupOptions + ); + } + /** * Parse FieldInfos * diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java new file mode 100644 index 000000000..cea322b5e --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSource.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.redis; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.SourceRequest; +import org.apache.inlong.manager.pojo.source.StreamSource; + +/** + * Redis source info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "Redis source info") +@JsonTypeDefine(value = SourceType.REDIS) +public class RedisSource extends StreamSource { + + @ApiModelProperty("Username of the redis server") + private String username; + + @ApiModelProperty("Password of the redis server") + private String password; + + @ApiModelProperty("Hostname of the redis server") + private String hostname; + + @ApiModelProperty("Port of the redis server") + @Builder.Default + private Integer port = 6379; + + @ApiModelProperty("Primary key") + private String primaryKey; + + @ApiModelProperty("Redis command, supports: hget, get, zscore, zrevrank") + private String redisCommand; + + @ApiModelProperty("Redis deploy mode, supports: standalone, cluster, sentinel") + private String redisMode; + + @ApiModelProperty("Cluster node infos only used for redis cluster deploy mode") + private String clusterNodes; + + @ApiModelProperty("Master name only used for redis sentinel deploy mode") + private String masterName; + + @ApiModelProperty("Sentinels info only used for redis sentinel deploy mode") + private String sentinelsInfo; + + @ApiModelProperty("Additional key only used for hash/Sorted-set data type") + private String additionalKey; + + @ApiModelProperty("Database number connect to redis for redis standalone/sentinel deploy modes") + private Integer database; + + @ApiModelProperty("Timeout value of connect to redis") + private Integer timeout; + + @ApiModelProperty("Timeout value of read data from redis") + private Integer soTimeout; + + @ApiModelProperty("Max connection number to redis") + private Integer maxTotal; + + @ApiModelProperty("Max free connection number") + private Integer maxIdle; + + @ApiModelProperty("Min free connection number") + private Integer minIdle; + + @ApiModelProperty("Lookup Async") + private Boolean lookupAsync; + + @ApiModelProperty("Lookup cache max rows") + private Long lookupCacheMaxRows; + + @ApiModelProperty("Lookup cache ttl") + private Long lookupCacheTtl; + + @ApiModelProperty("Lookup max retry times") + private Integer lookupMaxRetries; + + public RedisSource() { + this.setSourceType(SourceType.REDIS); + } + + @Override + public SourceRequest genSourceRequest() { + return CommonBeanUtils.copyProperties(this, RedisSourceRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java new file mode 100644 index 000000000..10892affb --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.redis; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; + +import javax.validation.constraints.NotNull; +import java.util.Map; + +/** + * redis source info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class RedisSourceDTO { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @ApiModelProperty("Username of the redis server") + private String username; + + @ApiModelProperty("Password of the redis server") + private String password; + + @ApiModelProperty("Hostname of the redis server") + private String hostname; + + @ApiModelProperty("Port of the redis server") + private Integer port; + + @ApiModelProperty("Primary key") + private String primaryKey; + + @ApiModelProperty("Redis command, supports: hget, get, zscore, zrevrank") + private String redisCommand; + + @ApiModelProperty("Redis deploy mode, supports: standalone, cluster, sentinel") + private String redisMode; + + @ApiModelProperty("Cluster node infos only used for redis cluster deploy mode") + private String clusterNodes; + + @ApiModelProperty("Master name only used for redis sentinel deploy mode") + private String masterName; + + @ApiModelProperty("Sentinels info only used for redis sentinel deploy mode") + private String sentinelsInfo; + + @ApiModelProperty("Additional key only used for hash/Sorted-set data type") + private String additionalKey; + + @ApiModelProperty("Database number connect to redis for redis standalone/sentinel deploy modes") + private Integer database; + + @ApiModelProperty("Timeout value of connect to redis") + private Integer timeout; + + @ApiModelProperty("Timeout value of read data from redis") + private Integer soTimeout; + + @ApiModelProperty("Max connection number to redis") + private Integer maxTotal; + + @ApiModelProperty("Max free connection number") + private Integer maxIdle; + + @ApiModelProperty("Min free connection number") + private Integer minIdle; + + @ApiModelProperty("Lookup cache max rows") + private Long lookupCacheMaxRows; + + @ApiModelProperty("Lookup cache ttl") + private Long lookupCacheTtl; + + @ApiModelProperty("Lookup max retry times") + private Integer lookupMaxRetries; + + @ApiModelProperty("Lookup Async") + private Boolean lookupAsync; + + @ApiModelProperty("Properties for redis") + private Map<String, Object> properties; + + /** + * Get the dto instance from request + */ + public static RedisSourceDTO getFromRequest(RedisSourceRequest request) { + return RedisSourceDTO.builder() + .username(request.getUsername()) + .password(request.getPassword()) + .hostname(request.getHostname()) + .port(request.getPort()) + .primaryKey(request.getPrimaryKey()) + .redisCommand(request.getRedisCommand()) + .redisMode(request.getRedisMode()) + .clusterNodes(request.getClusterNodes()) + .masterName(request.getMasterName()) + .sentinelsInfo(request.getSentinelsInfo()) + .additionalKey(request.getAdditionalKey()) + .database(request.getDatabase()) + .timeout(request.getTimeout()) + .soTimeout(request.getSoTimeout()) + .maxTotal(request.getMaxTotal()) + .maxIdle(request.getMaxIdle()) + .minIdle(request.getMinIdle()) + .lookupCacheMaxRows(request.getLookupCacheMaxRows()) + .lookupCacheTtl(request.getLookupCacheTtl()) + .lookupMaxRetries(request.getLookupMaxRetries()) + .lookupAsync(request.getLookupAsync()) + .properties(request.getProperties()) + .build(); + } + + public static RedisSourceDTO getFromJson(@NotNull String extParams) { + try { + OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return OBJECT_MAPPER.readValue(extParams, RedisSourceDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java new file mode 100644 index 000000000..b53c75725 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceRequest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.redis; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.SourceRequest; + +/** + * Redis source request + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "Redis source request") +@JsonTypeDefine(value = SourceType.REDIS) +public class RedisSourceRequest extends SourceRequest { + + @ApiModelProperty("Username of the redis server") + private String username; + + @ApiModelProperty("Password of the redis server") + private String password; + + @ApiModelProperty("Hostname of the redis server") + private String hostname; + + @ApiModelProperty("Port of the redis server") + private Integer port = 6379; + + @ApiModelProperty("Primary key") + private String primaryKey; + + @ApiModelProperty("Redis command, supports: hget, get, zscore, zrevrank") + private String redisCommand; + + @ApiModelProperty("Redis deploy mode, supports: standalone, cluster, sentinel") + private String redisMode; + + @ApiModelProperty("Cluster node infos only used for redis cluster deploy mode") + private String clusterNodes; + + @ApiModelProperty("Master name only used for redis sentinel deploy mode") + private String masterName; + + @ApiModelProperty("Sentinels info only used for redis sentinel deploy mode") + private String sentinelsInfo; + + @ApiModelProperty("Additional key only used for hash/Sorted-set data type") + private String additionalKey; + + @ApiModelProperty("Database number connect to redis for redis standalone/sentinel deploy modes") + private Integer database; + + @ApiModelProperty("Timeout value of connect to redis") + private Integer timeout; + + @ApiModelProperty("Timeout value of read data from redis") + private Integer soTimeout; + + @ApiModelProperty("Max connection number to redis") + private Integer maxTotal; + + @ApiModelProperty("Max free connection number") + private Integer maxIdle; + + @ApiModelProperty("Min free connection number") + private Integer minIdle; + + @ApiModelProperty("Lookup Async") + private Boolean lookupAsync; + + @ApiModelProperty("Lookup cache max rows") + private Long lookupCacheMaxRows; + + @ApiModelProperty("Lookup cache ttl") + private Long lookupCacheTtl; + + @ApiModelProperty("Lookup max retry times") + private Integer lookupMaxRetries; + + public RedisSourceRequest() { + this.setSourceType(SourceType.REDIS); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java new file mode 100644 index 000000000..17f338ae3 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.source.redis; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.pojo.source.SourceRequest; +import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.source.redis.RedisSource; +import org.apache.inlong.manager.pojo.source.redis.RedisSourceDTO; +import org.apache.inlong.manager.pojo.source.redis.RedisSourceRequest; +import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.service.source.AbstractSourceOperator; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * Redis stream source operator + */ +@Service +public class RedisSourceOperator extends AbstractSourceOperator { + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String sourceType) { + return SourceType.REDIS.equals(sourceType); + } + + @Override + protected String getSourceType() { + return SourceType.REDIS; + } + + @Override + protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) { + RedisSourceRequest sourceRequest = (RedisSourceRequest) request; + CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true); + try { + RedisSourceDTO dto = RedisSourceDTO.getFromRequest(sourceRequest); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + + @Override + public StreamSource getFromEntity(StreamSourceEntity entity) { + RedisSource source = new RedisSource(); + if (entity == null) { + return source; + } + + RedisSourceDTO dto = RedisSourceDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(entity, source, true); + CommonBeanUtils.copyProperties(dto, source, true); + + List<StreamField> sourceFields = super.getSourceFields(entity.getId()); + source.setFieldList(sourceFields); + return source; + } +} diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java new file mode 100644 index 000000000..9bf904906 --- /dev/null +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/RedisSourceServiceTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.source; + +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.source.redis.RedisSource; +import org.apache.inlong.manager.pojo.source.redis.RedisSourceRequest; +import org.apache.inlong.manager.service.ServiceBaseTest; +import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * Redis source service test + */ +public class RedisSourceServiceTest extends ServiceBaseTest { + + private static final String hostname = "127.0.0.1"; + private static final Integer port = 6379; + private static final String redisMode = "standalone"; + private static final String redisCommand = "get"; + private final String sourceName = "stream_source_service_test"; + + @Autowired + private StreamSourceService sourceService; + @Autowired + private InlongStreamServiceTest streamServiceTest; + + /** + * Save source info + */ + public Integer saveSource() { + streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR); + + RedisSourceRequest sourceInfo = new RedisSourceRequest(); + sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID); + sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID); + sourceInfo.setSourceName(sourceName); + sourceInfo.setSourceType(SourceType.REDIS); + sourceInfo.setHostname(hostname); + sourceInfo.setPort(port); + sourceInfo.setRedisCommand(redisCommand); + sourceInfo.setRedisMode(redisMode); + return sourceService.save(sourceInfo, GLOBAL_OPERATOR); + } + + @Test + public void testSaveAndDelete() { + Integer id = this.saveSource(); + Assertions.assertNotNull(id); + + boolean result = sourceService.delete(id, GLOBAL_OPERATOR); + Assertions.assertTrue(result); + } + + @Test + public void testListByIdentifier() { + Integer id = this.saveSource(); + StreamSource source = sourceService.get(id); + Assertions.assertEquals(GLOBAL_GROUP_ID, source.getInlongGroupId()); + + sourceService.delete(id, GLOBAL_OPERATOR); + } + + @Test + public void testGetAndUpdate() { + Integer id = this.saveSource(); + StreamSource response = sourceService.get(id); + Assertions.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId()); + + RedisSource redisSource = (RedisSource) response; + RedisSourceRequest request = CommonBeanUtils.copyProperties(redisSource, RedisSourceRequest::new); + System.out.println(request); + boolean result = sourceService.update(request, GLOBAL_OPERATOR); + Assertions.assertTrue(result); + + sourceService.delete(id, GLOBAL_OPERATOR); + } + +}