This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e0eb999ac [INLONG-6505][Manager] Support Kafka MQ in Inlong Consume (#6506) e0eb999ac is described below commit e0eb999acbe9257510291bd6bb478a64b8d8f171 Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Fri Nov 11 12:08:24 2022 +0800 [INLONG-6505][Manager] Support Kafka MQ in Inlong Consume (#6506) --- .../pojo/consume/kafka/ConsumeKafkaDTO.java | 59 +++++++++++++++ .../pojo/consume/kafka/ConsumeKafkaInfo.java | 43 +++++++++++ .../pojo/consume/kafka/ConsumeKafkaRequest.java | 38 ++++++++++ .../service/consume/ConsumeKafkaOperator.java | 88 ++++++++++++++++++++++ 4 files changed, 228 insertions(+) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/kafka/ConsumeKafkaDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/kafka/ConsumeKafkaDTO.java new file mode 100644 index 000000000..4058f3093 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/kafka/ConsumeKafkaDTO.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume.kafka; + +import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.consume.BaseInlongConsume; + +import javax.validation.constraints.NotNull; + +/** + * Inlong group info of Kafka + */ +@Data +@Builder +@AllArgsConstructor +@ApiModel("Inlong group info of Kafka") +public class ConsumeKafkaDTO extends BaseInlongConsume { + + /** + * Get the dto instance from the request + */ + public static ConsumeKafkaDTO getFromRequest(ConsumeKafkaRequest request) { + return CommonBeanUtils.copyProperties(request, ConsumeKafkaDTO::new, true); + } + + /** + * Get the dto instance from the JSON string. + */ + public static ConsumeKafkaDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, ConsumeKafkaDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CONSUMER_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/kafka/ConsumeKafkaInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/kafka/ConsumeKafkaInfo.java new file mode 100644 index 000000000..1bdceef55 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/kafka/ConsumeKafkaInfo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume.kafka; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; + +/** + * Inlong consume info of Kafka + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = MQType.KAFKA) +@ApiModel("Inlong consume info of Kafka") +public class ConsumeKafkaInfo extends InlongConsumeInfo { + + @Override + public ConsumeKafkaRequest genRequest() { + return CommonBeanUtils.copyProperties(this, ConsumeKafkaRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/kafka/ConsumeKafkaRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/kafka/ConsumeKafkaRequest.java new file mode 100644 index 000000000..ea0956824 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/kafka/ConsumeKafkaRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume.kafka; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; + +/** + * Inlong consume request of Kafka + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel("Inlong consume request of Kafka") +@JsonTypeDefine(value = MQType.KAFKA) +public class ConsumeKafkaRequest extends InlongConsumeRequest { + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeKafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeKafkaOperator.java new file mode 100644 index 000000000..3df4799a4 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeKafkaOperator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.consume; + +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.apache.inlong.manager.pojo.consume.kafka.ConsumeKafkaDTO; +import org.apache.inlong.manager.pojo.consume.kafka.ConsumeKafkaInfo; +import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; +import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaTopicInfo; +import org.apache.inlong.manager.service.group.InlongGroupService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Inlong consume operator for Kafka. + */ +@Service +public class ConsumeKafkaOperator extends AbstractConsumeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeKafkaOperator.class); + + @Autowired + private InlongGroupService groupService; + + @Override + public Boolean accept(String mqType) { + return getMQType().equals(mqType); + } + + @Override + public String getMQType() { + return MQType.KAFKA; + } + + @Override + public void checkTopicInfo(InlongConsumeRequest request) { + String groupId = request.getInlongGroupId(); + InlongGroupTopicInfo topicInfo = groupService.getTopic(groupId); + Preconditions.checkNotNull(topicInfo, "inlong group not exist: " + groupId); + + InlongKafkaTopicInfo kafkaInfo = (InlongKafkaTopicInfo) topicInfo; + String originTopic = request.getTopic(); + Preconditions.checkTrue(kafkaInfo.getTopics().contains(originTopic), + "Kafka topic not exist for " + originTopic); + } + + @Override + public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) { + Preconditions.checkNotNull(entity, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage()); + + ConsumeKafkaInfo kafkaInfo = new ConsumeKafkaInfo(); + CommonBeanUtils.copyProperties(entity, kafkaInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + ConsumeKafkaDTO dto = ConsumeKafkaDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, kafkaInfo); + } + return kafkaInfo; + } + + @Override + protected void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity) { + LOGGER.info("do nothing for inlong consume with Kafka"); + } +}