[GitHub] [inlong] fuweng11 commented on a diff in pull request #8824: [INLONG-8823][Manager] Supporting data flow to Pulsar

2023-08-31 Thread via GitHub


fuweng11 commented on code in PR #8824:
URL: https://github.com/apache/inlong/pull/8824#discussion_r1311601466


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import 
org.apache.inlong.manager.common.consts.SortStandAloneConfig.PulsarParamsConfig;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Pulsar sink operator
+ */
+@Service
+public class PulsarSinkOperator extends AbstractSinkOperator {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarSinkOperator.class);
+
+@Autowired
+private ObjectMapper objectMapper;
+@Autowired
+InlongStreamEntityMapper inlongStreamEntityMapper;
+
+@Override
+public Boolean accept(String sinkType) {
+return SinkType.KAFKA.equals(sinkType);

Review Comment:
   ` SinkType.PULSAR.equals(sinkType);`



##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import 
org.apache.inlong.manager.common.consts.SortStandAloneConfig.PulsarParamsConfig;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.ja

[GitHub] [inlong] vernedeng commented on a diff in pull request #8824: [INLONG-8823][Manager] Supporting data flow to Pulsar

2023-08-31 Thread via GitHub


vernedeng commented on code in PR #8824:
URL: https://github.com/apache/inlong/pull/8824#discussion_r1312409310


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Kafka sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "pulsar sink info")
+@JsonTypeDefine(value = SinkType.PULSAR)
+public class PulsarSink extends StreamSink {
+
+@ApiModelProperty("pulsar service url")
+private String serviceUrl;

Review Comment:
   ServiceUrl is the params of Pulsar data_node



##
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortStandAloneConfig.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.consts;
+
+/**
+ * Constants of sort standalone config.
+ */
+public interface SortStandAloneConfig {
+
+interface PulsarParamsConfig {

Review Comment:
   There are two types of params, id params and sink params. Please specify 
which params of this config is.



##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.pulsar;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Pulsar sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarS

[GitHub] [inlong] vernedeng commented on pull request #8818: [INLONG-8643][Sort] Support Iceberg source

2023-08-31 Thread via GitHub


vernedeng commented on PR #8818:
URL: https://github.com/apache/inlong/pull/8818#issuecomment-1702105634

   > @vernedeng please add license for copyed files.
   
   Flxed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap merged pull request #8818: [INLONG-8643][Sort] Support Iceberg source

2023-08-31 Thread via GitHub


EMsnap merged PR #8818:
URL: https://github.com/apache/inlong/pull/8818


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-8643][Sort] Support Iceberg source (#8818)

2023-08-31 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 5d7fc81e67 [INLONG-8643][Sort] Support Iceberg source (#8818)
5d7fc81e67 is described below

commit 5d7fc81e6761bba8289b8f862af92eacc487771a
Author: vernedeng 
AuthorDate: Fri Sep 1 14:31:40 2023 +0800

[INLONG-8643][Sort] Support Iceberg source (#8818)
---
 .../resource/sort/DefaultSortConfigOperator.java   |   1 -
 .../sort/protocol/constant/IcebergConstant.java|  13 +
 .../inlong/sort/protocol/node/ExtractNode.java |   2 +
 .../org/apache/inlong/sort/protocol/node/Node.java |   2 +
 .../IcebergExtracNode.java}| 107 +--
 .../sort/protocol/node/load/IcebergLoadNode.java   |  12 +-
 .../parser/Iceberg2StarRocksSqlParserTest.java | 133 
 inlong-sort/sort-flink/sort-flink-v1.13/pom.xml|   3 +-
 inlong-sort/sort-flink/sort-flink-v1.15/pom.xml|   5 -
 .../sort-connectors/iceberg/pom.xml| 139 
 .../apache/inlong/sort/iceberg/FlinkCatalog.java   | 812 +
 .../inlong/sort/iceberg/FlinkCatalogFactory.java   | 216 ++
 .../sort/iceberg/FlinkDynamicTableFactory.java | 205 ++
 .../sort/iceberg/FlinkEnvironmentContext.java  |  35 +
 .../org.apache.flink.table.factories.Factory   |  18 +
 .../org.apache.flink.table.factories.TableFactory  |  16 +
 .../sort-flink-v1.15/sort-connectors/pom.xml   |   1 +
 licenses/inlong-sort-connectors/LICENSE|   8 +
 pom.xml|   4 +-
 19 files changed, 1666 insertions(+), 66 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 97683f1ac3..37429cf2a5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -253,5 +253,4 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 groupInfo.getExtList().removeIf(ext -> 
extInfo.getKeyName().equals(ext.getKeyName()));
 groupInfo.getExtList().add(extInfo);
 }
-
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index 70ea0d5158..676f7f4435 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -22,6 +22,19 @@ package org.apache.inlong.sort.protocol.constant;
  */
 public class IcebergConstant {
 
+public static final String DEFAULT_CATALOG_NAME = "ICEBERG_HIVE";
+public static final String CONNECTOR_KEY = "connector";
+public static final String CONNECTOR = "iceberg-inlong";
+public static final String DATABASE_KEY = "catalog-database";
+public static final String DEFAULT_DATABASE_KEY = "default-database";
+public static final String TABLE_KEY = "catalog-table";
+public static final String CATALOG_TYPE_KEY = "catalog-type";
+public static final String CATALOG_NAME_KEY = "catalog-name";
+public static final String URI_KEY = "uri";
+public static final String WAREHOUSE_KEY = "warehouse";
+public static final String START_SNAPSHOT_ID = "start-snapshot-id";
+public static final String STREAMING = "streaming";
+
 /**
  * Iceberg supported catalog type
  */
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index 3c27b6a407..543f8cf3be 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -64,6 +65,7 @@ import ja

[GitHub] [inlong] castorqin commented on a diff in pull request #8824: [INLONG-8823][Manager] Supporting data flow to Pulsar

2023-08-31 Thread via GitHub


castorqin commented on code in PR #8824:
URL: https://github.com/apache/inlong/pull/8824#discussion_r1312631991


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import 
org.apache.inlong.manager.common.consts.SortStandAloneConfig.PulsarParamsConfig;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Pulsar sink operator
+ */
+@Service
+public class PulsarSinkOperator extends AbstractSinkOperator {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarSinkOperator.class);
+
+@Autowired
+private ObjectMapper objectMapper;
+@Autowired
+InlongStreamEntityMapper inlongStreamEntityMapper;
+
+@Override
+public Boolean accept(String sinkType) {
+return SinkType.KAFKA.equals(sinkType);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org