[GitHub] [inlong] fuweng11 commented on a diff in pull request #8824: [INLONG-8823][Manager] Supporting data flow to Pulsar
fuweng11 commented on code in PR #8824: URL: https://github.com/apache/inlong/pull/8824#discussion_r1311601466 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.sink.pulsar; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.consts.SortStandAloneConfig.PulsarParamsConfig; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; +import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; +import org.apache.inlong.manager.pojo.sink.SinkField; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink; +import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO; +import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest; +import org.apache.inlong.manager.service.sink.AbstractSinkOperator; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; + +/** + * Pulsar sink operator + */ +@Service +public class PulsarSinkOperator extends AbstractSinkOperator { + +private static final Logger LOGGER = LoggerFactory.getLogger(PulsarSinkOperator.class); + +@Autowired +private ObjectMapper objectMapper; +@Autowired +InlongStreamEntityMapper inlongStreamEntityMapper; + +@Override +public Boolean accept(String sinkType) { +return SinkType.KAFKA.equals(sinkType); Review Comment: ` SinkType.PULSAR.equals(sinkType);` ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.sink.pulsar; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.consts.SortStandAloneConfig.PulsarParamsConfig; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; +import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; +import org.apache.inlong.manager.pojo.sink.SinkField; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink; +import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO; +import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest; +import org.apache.inlong.manager.service.sink.AbstractSinkOperator; + +import com.fasterxml.ja
[GitHub] [inlong] vernedeng commented on a diff in pull request #8824: [INLONG-8823][Manager] Supporting data flow to Pulsar
vernedeng commented on code in PR #8824: URL: https://github.com/apache/inlong/pull/8824#discussion_r1312409310 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java: ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.pulsar; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * Kafka sink info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "pulsar sink info") +@JsonTypeDefine(value = SinkType.PULSAR) +public class PulsarSink extends StreamSink { + +@ApiModelProperty("pulsar service url") +private String serviceUrl; Review Comment: ServiceUrl is the params of Pulsar data_node ## inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortStandAloneConfig.java: ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.common.consts; + +/** + * Constants of sort standalone config. + */ +public interface SortStandAloneConfig { + +interface PulsarParamsConfig { Review Comment: There are two types of params, id params and sink params. Please specify which params of this config is. ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.pulsar; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +/** + * Pulsar sink info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarS
[GitHub] [inlong] vernedeng commented on pull request #8818: [INLONG-8643][Sort] Support Iceberg source
vernedeng commented on PR #8818: URL: https://github.com/apache/inlong/pull/8818#issuecomment-1702105634 > @vernedeng please add license for copyed files. Flxed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap merged pull request #8818: [INLONG-8643][Sort] Support Iceberg source
EMsnap merged PR #8818: URL: https://github.com/apache/inlong/pull/8818 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-8643][Sort] Support Iceberg source (#8818)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 5d7fc81e67 [INLONG-8643][Sort] Support Iceberg source (#8818) 5d7fc81e67 is described below commit 5d7fc81e6761bba8289b8f862af92eacc487771a Author: vernedeng AuthorDate: Fri Sep 1 14:31:40 2023 +0800 [INLONG-8643][Sort] Support Iceberg source (#8818) --- .../resource/sort/DefaultSortConfigOperator.java | 1 - .../sort/protocol/constant/IcebergConstant.java| 13 + .../inlong/sort/protocol/node/ExtractNode.java | 2 + .../org/apache/inlong/sort/protocol/node/Node.java | 2 + .../IcebergExtracNode.java}| 107 +-- .../sort/protocol/node/load/IcebergLoadNode.java | 12 +- .../parser/Iceberg2StarRocksSqlParserTest.java | 133 inlong-sort/sort-flink/sort-flink-v1.13/pom.xml| 3 +- inlong-sort/sort-flink/sort-flink-v1.15/pom.xml| 5 - .../sort-connectors/iceberg/pom.xml| 139 .../apache/inlong/sort/iceberg/FlinkCatalog.java | 812 + .../inlong/sort/iceberg/FlinkCatalogFactory.java | 216 ++ .../sort/iceberg/FlinkDynamicTableFactory.java | 205 ++ .../sort/iceberg/FlinkEnvironmentContext.java | 35 + .../org.apache.flink.table.factories.Factory | 18 + .../org.apache.flink.table.factories.TableFactory | 16 + .../sort-flink-v1.15/sort-connectors/pom.xml | 1 + licenses/inlong-sort-connectors/LICENSE| 8 + pom.xml| 4 +- 19 files changed, 1666 insertions(+), 66 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 97683f1ac3..37429cf2a5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -253,5 +253,4 @@ public class DefaultSortConfigOperator implements SortConfigOperator { groupInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName())); groupInfo.getExtList().add(extInfo); } - } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java index 70ea0d5158..676f7f4435 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java @@ -22,6 +22,19 @@ package org.apache.inlong.sort.protocol.constant; */ public class IcebergConstant { +public static final String DEFAULT_CATALOG_NAME = "ICEBERG_HIVE"; +public static final String CONNECTOR_KEY = "connector"; +public static final String CONNECTOR = "iceberg-inlong"; +public static final String DATABASE_KEY = "catalog-database"; +public static final String DEFAULT_DATABASE_KEY = "default-database"; +public static final String TABLE_KEY = "catalog-table"; +public static final String CATALOG_TYPE_KEY = "catalog-type"; +public static final String CATALOG_NAME_KEY = "catalog-name"; +public static final String URI_KEY = "uri"; +public static final String WAREHOUSE_KEY = "warehouse"; +public static final String START_SNAPSHOT_ID = "start-snapshot-id"; +public static final String STREAMING = "streaming"; + /** * Iceberg supported catalog type */ diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java index 3c27b6a407..543f8cf3be 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java @@ -21,6 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode; import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode; import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode; +import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode; import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode; import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; @@ -64,6 +65,7 @@ import ja
[GitHub] [inlong] castorqin commented on a diff in pull request #8824: [INLONG-8823][Manager] Supporting data flow to Pulsar
castorqin commented on code in PR #8824: URL: https://github.com/apache/inlong/pull/8824#discussion_r1312631991 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.sink.pulsar; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.consts.SortStandAloneConfig.PulsarParamsConfig; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; +import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; +import org.apache.inlong.manager.pojo.sink.SinkField; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink; +import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO; +import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest; +import org.apache.inlong.manager.service.sink.AbstractSinkOperator; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; + +/** + * Pulsar sink operator + */ +@Service +public class PulsarSinkOperator extends AbstractSinkOperator { + +private static final Logger LOGGER = LoggerFactory.getLogger(PulsarSinkOperator.class); + +@Autowired +private ObjectMapper objectMapper; +@Autowired +InlongStreamEntityMapper inlongStreamEntityMapper; + +@Override +public Boolean accept(String sinkType) { +return SinkType.KAFKA.equals(sinkType); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org