This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3faf124148 [INLONG-10884][Manager] Support configuring HTTP type sink 
(#10890)
3faf124148 is described below

commit 3faf124148a74127e900954a2317faac07b9b66a
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Mon Aug 26 19:33:59 2024 +0800

    [INLONG-10884][Manager] Support configuring HTTP type sink (#10890)
---
 .../inlong/manager/common/consts/DataNodeType.java |   1 +
 .../inlong/manager/common/consts/SinkType.java     |   3 +
 .../inlong/manager/common/enums/ClusterType.java   |   2 +
 .../sort/http/SortHttpClusterInfo.java}            |  33 +++---
 .../sort/http/SortHttpClusterRequest.java}         |  31 +++--
 .../manager/pojo/node/http/HttpDataNodeDTO.java    |  75 ++++++++++++
 .../manager/pojo/node/http/HttpDataNodeInfo.java   |  63 ++++++++++
 .../http/HttpDataNodeRequest.java}                 |  37 ++++--
 .../inlong/manager/pojo/sink/BaseStreamSink.java   |   3 +
 .../inlong/manager/pojo/sink/SinkRequest.java      |   3 +
 .../inlong/manager/pojo/sink/StreamSink.java       |   3 +
 .../inlong/manager/pojo/sink/http/HttpSink.java    |  64 ++++++++++
 .../inlong/manager/pojo/sink/http/HttpSinkDTO.java |  76 ++++++++++++
 .../HttpSinkRequest.java}                          |  38 ++++--
 .../service/cluster/SortClusterOperator.java       |   5 +
 .../service/node/http/HttpDataNodeOperator.java    |  96 +++++++++++++++
 .../service/sink/http/HttpSinkOperator.java        | 129 +++++++++++++++++++++
 17 files changed, 605 insertions(+), 57 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 0f1952c938..47b139f159 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -37,6 +37,7 @@ public class DataNodeType {
     public static final String SQLSERVER = "SQLSERVER";
     public static final String MONGODB = "MONGODB";
     public static final String DORIS = "DORIS";
+    public static final String HTTP = "HTTP";
     public static final String OCEANBASE = "OCEANBASE";
 
     /**
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index 5d069e33df..16a1bfd3d8 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -71,6 +71,9 @@ public class SinkType extends StreamType {
     @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String TUBEMQ = "TUBEMQ";
 
+    @SupportSortType(sortType = SortType.SORT_STANDALONE)
+    public static final String HTTP = "HTTP";
+
     @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String OCEANBASE = "OCEANBASE";
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
index d7e8ba7d4b..d16ab1d46c 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
@@ -33,6 +33,7 @@ public class ClusterType {
     public static final String DATAPROXY = "DATAPROXY";
     public static final String KAFKA = "KAFKA";
 
+    public static final String SORT_HTTP = "SORT_HTTP";
     public static final String SORT_ES = "SORT_ES";
     public static final String SORT_CLS = "SORT_CLS";
     public static final String SORT_PULSAR = "SORT_PULSAR";
@@ -48,6 +49,7 @@ public class ClusterType {
             add(ClusterType.PULSAR);
             add(ClusterType.DATAPROXY);
             add(ClusterType.KAFKA);
+            add(ClusterType.SORT_HTTP);
             add(ClusterType.SORT_ES);
             add(ClusterType.SORT_CLS);
             add(ClusterType.SORT_PULSAR);
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java
similarity index 58%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java
index 09115a9e02..b2660c2df9 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java
@@ -15,26 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.pojo.cluster.sort.http;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo;
 
 import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
-/**
- * The base parameter class of StreamSink, support user extend their own 
business params.
- */
 @Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
-
-    @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
-    private String startConsumeTime;
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.SORT_HTTP)
+@ApiModel("Inlong cluster info for Sort http")
+public class SortHttpClusterInfo extends BaseSortClusterInfo {
 
-    @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
-    private String stopConsumeTime;
-}
+    public SortHttpClusterInfo() {
+        this.setType(ClusterType.SORT_HTTP);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java
similarity index 58%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java
index 09115a9e02..a7fd4d027e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java
@@ -15,26 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.pojo.cluster.sort.http;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;
 
 import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
-/**
- * The base parameter class of StreamSink, support user extend their own 
business params.
- */
 @Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
-
-    @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
-    private String startConsumeTime;
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.SORT_HTTP)
+@ApiModel("Inlong cluster request for Sort http")
+public class SortHttpClusterRequest extends BaseSortClusterRequest {
 
-    @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
-    private String stopConsumeTime;
+    public SortHttpClusterRequest() {
+        this.setType(ClusterType.SORT_HTTP);
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java
new file mode 100644
index 0000000000..52fa047606
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.http;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Http service data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Http service data node info")
+public class HttpDataNodeDTO {
+
+    @ApiModelProperty("HTTP base url")
+    private String baseUrl;
+
+    @ApiModelProperty("Whether to enable credential")
+    private Boolean enableCredential;
+
+    @ApiModelProperty("Max connect count")
+    private Integer maxConnect;
+    /**
+     * Get the dto instance from the request
+     */
+    public static HttpDataNodeDTO getFromRequest(HttpDataNodeRequest request, 
String extParams) {
+        HttpDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+                ? HttpDataNodeDTO.getFromJson(extParams)
+                : new HttpDataNodeDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static HttpDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, HttpDataNodeDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
+                    String.format("Failed to parse extParams for Cloud log 
service node: %s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java
new file mode 100644
index 0000000000..b24224134c
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.http;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Cloud log service data node info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HTTP)
+@ApiModel("HTTP data node info")
+public class HttpDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("HTTP base url")
+    private String baseUrl;
+
+    @ApiModelProperty("Whether to enable credential")
+    private Boolean enableCredential;
+
+    @ApiModelProperty("Max connect count")
+    private Integer maxConnect;
+
+    public HttpDataNodeInfo() {
+        setType(DataNodeType.HTTP);
+    }
+
+    @Override
+    public DataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, HttpDataNodeRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java
similarity index 51%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java
index 09115a9e02..1fba8d9518 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java
@@ -15,26 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.pojo.node.http;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
 /**
- * The base parameter class of StreamSink, support user extend their own 
business params.
+ * Cloud log service data node request
  */
 @Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HTTP)
+@ApiModel("Http service data node request")
+public class HttpDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty("HTTP base url")
+    private String baseUrl;
+
+    @ApiModelProperty("Whether to enable credential")
+    private Boolean enableCredential;
+
+    @ApiModelProperty("Max connect count")
+    private Integer maxConnect;
 
-    @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
-    private String startConsumeTime;
+    public HttpDataNodeRequest() {
+        this.setType(DataNodeType.HTTP);
+    }
 
-    @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
-    private String stopConsumeTime;
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
index 09115a9e02..d8df7f4a28 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
@@ -32,6 +32,9 @@ import lombok.NoArgsConstructor;
 @ApiModel("Base info of stream sink")
 public class BaseStreamSink {
 
+    @ApiModelProperty("Transform sql")
+    private String transformSql;
+
     @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
     private String startConsumeTime;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
index 8c190a3a84..24c544b943 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
@@ -71,6 +71,9 @@ public abstract class SinkRequest {
     @Pattern(regexp = "^[a-zA-Z0-9_.-]{1,100}$", message = "sinkName only 
supports letters, numbers, '.', '-', or '_'")
     private String sinkName;
 
+    @ApiModelProperty("Transform sql")
+    private String transformSql;
+
     @ApiModelProperty("Sink description")
     @Length(max = 500, message = "length must be less than or equal to 500")
     private String description;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
index 7641256de0..85fd72a1a4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
@@ -68,6 +68,9 @@ public abstract class StreamSink extends StreamNode {
     @ApiModelProperty("Sink name, unique in one stream.")
     private String sinkName;
 
+    @ApiModelProperty("Transform sql")
+    private String transformSql;
+
     @ApiModelProperty("Sink description")
     private String description;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java
new file mode 100644
index 0000000000..fdcd7228fe
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.http;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.Map;
+
+/**
+ * HTTP sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "HTTP sink info")
+@JsonTypeDefine(value = SinkType.HTTP)
+public class HttpSink extends StreamSink {
+
+    @ApiModelProperty("HTTP path")
+    private String path;
+
+    @ApiModelProperty("HTTP method, like POST, GET")
+    private String method;
+
+    @ApiModelProperty("HTTP headers")
+    private Map<String, String> headers;
+
+    @ApiModelProperty("Max retry times")
+    private Integer maxRetryTimes;
+
+    public HttpSink() {
+        this.setSinkType(SinkType.HTTP);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, HttpSinkRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java
new file mode 100644
index 0000000000..5431e07ec7
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.http;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.Map;
+
+/**
+ * Sink info of Cloud log service
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class HttpSinkDTO extends BaseStreamSink {
+
+    @ApiModelProperty("HTTP path")
+    private String path;
+
+    @ApiModelProperty("HTTP method, like POST, GET")
+    private String method;
+
+    @ApiModelProperty("HTTP headers")
+    private Map<String, String> headers;
+
+    @ApiModelProperty("Max retry times")
+    private Integer maxRetryTimes;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static HttpSinkDTO getFromRequest(HttpSinkRequest request, String 
extParams) {
+        HttpSinkDTO dto = StringUtils.isNotBlank(extParams)
+                ? HttpSinkDTO.getFromJson(extParams)
+                : new HttpSinkDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    public static HttpSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, HttpSinkDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of http SinkDTO failure: 
%s", e.getMessage()));
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java
similarity index 53%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java
index 09115a9e02..207322aa3a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java
@@ -15,26 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.pojo.sink.http;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.Map;
 
 /**
- * The base parameter class of StreamSink, support user extend their own 
business params.
+ * Http sink request.
  */
 @Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Http sink request")
+@JsonTypeDefine(value = SinkType.HTTP)
+public class HttpSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("HTTP path")
+    private String path;
+
+    @ApiModelProperty("HTTP method, like POST, GET")
+    private String method;
+
+    @ApiModelProperty("HTTP headers")
+    private Map<String, String> headers;
 
-    @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
-    private String startConsumeTime;
+    @ApiModelProperty("Max retry times")
+    private Integer maxRetryTimes;
 
-    @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
-    private String stopConsumeTime;
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
index 6ffdbafae0..b328a39b37 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.sort.es.SortEsClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.sort.http.SortHttpClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.sort.kafka.SortKafkaClusterInfo;
 import 
org.apache.inlong.manager.pojo.cluster.sort.pulsar.SortPulsarClusterInfo;
 import org.apache.inlong.manager.pojo.sort.BaseSortClusterDTO;
@@ -50,6 +51,7 @@ public class SortClusterOperator extends 
AbstractClusterOperator {
     private static final Set<String> SORT_CLUSTER_SET = new HashSet<String>() {
 
         {
+            add(ClusterType.SORT_HTTP);
             add(ClusterType.SORT_CLS);
             add(ClusterType.SORT_PULSAR);
             add(ClusterType.SORT_ES);
@@ -84,6 +86,9 @@ public class SortClusterOperator extends 
AbstractClusterOperator {
 
         ClusterInfo sortClusterInfo;
         switch (entity.getType()) {
+            case ClusterType.SORT_HTTP:
+                sortClusterInfo = new SortHttpClusterInfo();
+                break;
             case ClusterType.SORT_CLS:
                 sortClusterInfo = new SortClsClusterInfo();
                 break;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java
new file mode 100644
index 0000000000..18482d4a04
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.http;
+
+import org.apache.inlong.common.pojo.sort.node.HttpNodeConfig;
+import org.apache.inlong.common.pojo.sort.node.NodeConfig;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.http.HttpDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.http.HttpDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.http.HttpDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class HttpDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HttpDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        HttpDataNodeRequest httpDataNodeRequest = (HttpDataNodeRequest) 
request;
+        CommonBeanUtils.copyProperties(httpDataNodeRequest, targetEntity, 
true);
+        try {
+            HttpDataNodeDTO dto = 
HttpDataNodeDTO.getFromRequest(httpDataNodeRequest, 
targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("Failed to build extParams for Cloud log 
service node: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return DataNodeType.HTTP.equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.HTTP;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+        HttpDataNodeInfo info = new HttpDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, info);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            HttpDataNodeDTO dto = 
HttpDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, info);
+        }
+        return info;
+    }
+
+    @Override
+    public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
+        DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
+        HttpNodeConfig httpNodeConfig = 
CommonBeanUtils.copyProperties(dataNodeInfo, HttpNodeConfig::new);
+        HttpDataNodeDTO dto = 
HttpDataNodeDTO.getFromJson(dataNodeEntity.getExtParams());
+        CommonBeanUtils.copyProperties(dto, httpNodeConfig);
+        httpNodeConfig.setPassword(dataNodeEntity.getToken());
+        httpNodeConfig.setNodeName(dataNodeInfo.getName());
+        return httpNodeConfig;
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java
new file mode 100644
index 0000000000..030c0c787a
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.http;
+
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.HttpSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.node.http.HttpDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.http.HttpSink;
+import org.apache.inlong.manager.pojo.sink.http.HttpSinkDTO;
+import org.apache.inlong.manager.pojo.sink.http.HttpSinkRequest;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Http sink operator
+ */
+@Service
+public class HttpSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HttpSinkOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private DataNodeEntityMapper dataNodeEntityMapper;
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
+        HttpSinkRequest sinkRequest = (HttpSinkRequest) request;
+        try {
+            HttpSinkDTO dto = HttpSinkDTO.getFromRequest(sinkRequest, 
targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of http SinkDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.HTTP;
+    }
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.HTTP.equals(sinkType);
+    }
+
+    @Override
+    public StreamSink getFromEntity(StreamSinkEntity entity) {
+        HttpSink sink = new HttpSink();
+        if (entity == null) {
+            return sink;
+        }
+
+        HttpSinkDTO dto = HttpSinkDTO.getFromJson(entity.getExtParams());
+        DataNodeEntity dataNodeEntity = 
dataNodeEntityMapper.selectByUniqueKey(entity.getDataNodeName(),
+                DataNodeType.HTTP);
+        HttpDataNodeDTO httpDataNodeDTO = 
JsonUtils.parseObject(dataNodeEntity.getExtParams(),
+                HttpDataNodeDTO.class);
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        CommonBeanUtils.copyProperties(httpDataNodeDTO, sink, true);
+        List<SinkField> sinkFields = getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+    @Override
+    public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
+        HttpSink httpSink = (HttpSink) sink;
+        HttpSinkConfig sinkConfig = CommonBeanUtils.copyProperties(httpSink, 
HttpSinkConfig::new);
+        List<FieldConfig> fields = 
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
+                v -> {
+                    FieldConfig fieldConfig = new FieldConfig();
+                    FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
+                            v.getFieldType().toLowerCase());
+                    fieldConfig.setName(v.getFieldName());
+                    fieldConfig.setFormatInfo(formatInfo);
+                    return fieldConfig;
+                }).collect(Collectors.toList());
+        sinkConfig.setFieldConfigs(fields);
+        return sinkConfig;
+    }
+}


Reply via email to