This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 3faf124148 [INLONG-10884][Manager] Support configuring HTTP type sink (#10890) 3faf124148 is described below commit 3faf124148a74127e900954a2317faac07b9b66a Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Mon Aug 26 19:33:59 2024 +0800 [INLONG-10884][Manager] Support configuring HTTP type sink (#10890) --- .../inlong/manager/common/consts/DataNodeType.java | 1 + .../inlong/manager/common/consts/SinkType.java | 3 + .../inlong/manager/common/enums/ClusterType.java | 2 + .../sort/http/SortHttpClusterInfo.java} | 33 +++--- .../sort/http/SortHttpClusterRequest.java} | 31 +++-- .../manager/pojo/node/http/HttpDataNodeDTO.java | 75 ++++++++++++ .../manager/pojo/node/http/HttpDataNodeInfo.java | 63 ++++++++++ .../http/HttpDataNodeRequest.java} | 37 ++++-- .../inlong/manager/pojo/sink/BaseStreamSink.java | 3 + .../inlong/manager/pojo/sink/SinkRequest.java | 3 + .../inlong/manager/pojo/sink/StreamSink.java | 3 + .../inlong/manager/pojo/sink/http/HttpSink.java | 64 ++++++++++ .../inlong/manager/pojo/sink/http/HttpSinkDTO.java | 76 ++++++++++++ .../HttpSinkRequest.java} | 38 ++++-- .../service/cluster/SortClusterOperator.java | 5 + .../service/node/http/HttpDataNodeOperator.java | 96 +++++++++++++++ .../service/sink/http/HttpSinkOperator.java | 129 +++++++++++++++++++++ 17 files changed, 605 insertions(+), 57 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java index 0f1952c938..47b139f159 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java @@ -37,6 +37,7 @@ public class DataNodeType { public static final String SQLSERVER = "SQLSERVER"; public static final String MONGODB = "MONGODB"; public static final String DORIS = "DORIS"; + public static final String HTTP = "HTTP"; public static final String OCEANBASE = "OCEANBASE"; /** diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index 5d069e33df..16a1bfd3d8 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -71,6 +71,9 @@ public class SinkType extends StreamType { @SupportSortType(sortType = SortType.SORT_FLINK) public static final String TUBEMQ = "TUBEMQ"; + @SupportSortType(sortType = SortType.SORT_STANDALONE) + public static final String HTTP = "HTTP"; + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String OCEANBASE = "OCEANBASE"; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java index d7e8ba7d4b..d16ab1d46c 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java @@ -33,6 +33,7 @@ public class ClusterType { public static final String DATAPROXY = "DATAPROXY"; public static final String KAFKA = "KAFKA"; + public static final String SORT_HTTP = "SORT_HTTP"; public static final String SORT_ES = "SORT_ES"; public static final String SORT_CLS = "SORT_CLS"; public static final String SORT_PULSAR = "SORT_PULSAR"; @@ -48,6 +49,7 @@ public class ClusterType { add(ClusterType.PULSAR); add(ClusterType.DATAPROXY); add(ClusterType.KAFKA); + add(ClusterType.SORT_HTTP); add(ClusterType.SORT_ES); add(ClusterType.SORT_CLS); add(ClusterType.SORT_PULSAR); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java similarity index 58% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java index 09115a9e02..b2660c2df9 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java @@ -15,26 +15,25 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.sink; +package org.apache.inlong.manager.pojo.cluster.sort.http; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo; import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; -/** - * The base parameter class of StreamSink, support user extend their own business params. - */ @Data -@AllArgsConstructor -@NoArgsConstructor -@ApiModel("Base info of stream sink") -public class BaseStreamSink { - - @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") - private String startConsumeTime; +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORT_HTTP) +@ApiModel("Inlong cluster info for Sort http") +public class SortHttpClusterInfo extends BaseSortClusterInfo { - @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") - private String stopConsumeTime; -} + public SortHttpClusterInfo() { + this.setType(ClusterType.SORT_HTTP); + } +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java similarity index 58% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java index 09115a9e02..a7fd4d027e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java @@ -15,26 +15,25 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.sink; +package org.apache.inlong.manager.pojo.cluster.sort.http; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; -/** - * The base parameter class of StreamSink, support user extend their own business params. - */ @Data -@AllArgsConstructor -@NoArgsConstructor -@ApiModel("Base info of stream sink") -public class BaseStreamSink { - - @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") - private String startConsumeTime; +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORT_HTTP) +@ApiModel("Inlong cluster request for Sort http") +public class SortHttpClusterRequest extends BaseSortClusterRequest { - @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") - private String stopConsumeTime; + public SortHttpClusterRequest() { + this.setType(ClusterType.SORT_HTTP); + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java new file mode 100644 index 0000000000..52fa047606 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.http; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +/** + * Http service data node info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Http service data node info") +public class HttpDataNodeDTO { + + @ApiModelProperty("HTTP base url") + private String baseUrl; + + @ApiModelProperty("Whether to enable credential") + private Boolean enableCredential; + + @ApiModelProperty("Max connect count") + private Integer maxConnect; + /** + * Get the dto instance from the request + */ + public static HttpDataNodeDTO getFromRequest(HttpDataNodeRequest request, String extParams) { + HttpDataNodeDTO dto = StringUtils.isNotBlank(extParams) + ? HttpDataNodeDTO.getFromJson(extParams) + : new HttpDataNodeDTO(); + return CommonBeanUtils.copyProperties(request, dto, true); + } + + /** + * Get the dto instance from the JSON string. + */ + public static HttpDataNodeDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, HttpDataNodeDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT, + String.format("Failed to parse extParams for Cloud log service node: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java new file mode 100644 index 0000000000..b24224134c --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.http; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * Cloud log service data node info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.HTTP) +@ApiModel("HTTP data node info") +public class HttpDataNodeInfo extends DataNodeInfo { + + @ApiModelProperty("HTTP base url") + private String baseUrl; + + @ApiModelProperty("Whether to enable credential") + private Boolean enableCredential; + + @ApiModelProperty("Max connect count") + private Integer maxConnect; + + public HttpDataNodeInfo() { + setType(DataNodeType.HTTP); + } + + @Override + public DataNodeRequest genRequest() { + return CommonBeanUtils.copyProperties(this, HttpDataNodeRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java similarity index 51% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java index 09115a9e02..1fba8d9518 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java @@ -15,26 +15,39 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.sink; +package org.apache.inlong.manager.pojo.node.http; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; /** - * The base parameter class of StreamSink, support user extend their own business params. + * Cloud log service data node request */ @Data -@AllArgsConstructor -@NoArgsConstructor -@ApiModel("Base info of stream sink") -public class BaseStreamSink { +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.HTTP) +@ApiModel("Http service data node request") +public class HttpDataNodeRequest extends DataNodeRequest { + + @ApiModelProperty("HTTP base url") + private String baseUrl; + + @ApiModelProperty("Whether to enable credential") + private Boolean enableCredential; + + @ApiModelProperty("Max connect count") + private Integer maxConnect; - @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") - private String startConsumeTime; + public HttpDataNodeRequest() { + this.setType(DataNodeType.HTTP); + } - @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") - private String stopConsumeTime; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java index 09115a9e02..d8df7f4a28 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java @@ -32,6 +32,9 @@ import lombok.NoArgsConstructor; @ApiModel("Base info of stream sink") public class BaseStreamSink { + @ApiModelProperty("Transform sql") + private String transformSql; + @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") private String startConsumeTime; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java index 8c190a3a84..24c544b943 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java @@ -71,6 +71,9 @@ public abstract class SinkRequest { @Pattern(regexp = "^[a-zA-Z0-9_.-]{1,100}$", message = "sinkName only supports letters, numbers, '.', '-', or '_'") private String sinkName; + @ApiModelProperty("Transform sql") + private String transformSql; + @ApiModelProperty("Sink description") @Length(max = 500, message = "length must be less than or equal to 500") private String description; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java index 7641256de0..85fd72a1a4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java @@ -68,6 +68,9 @@ public abstract class StreamSink extends StreamNode { @ApiModelProperty("Sink name, unique in one stream.") private String sinkName; + @ApiModelProperty("Transform sql") + private String transformSql; + @ApiModelProperty("Sink description") private String description; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java new file mode 100644 index 0000000000..fdcd7228fe --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.http; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.Map; + +/** + * HTTP sink info + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "HTTP sink info") +@JsonTypeDefine(value = SinkType.HTTP) +public class HttpSink extends StreamSink { + + @ApiModelProperty("HTTP path") + private String path; + + @ApiModelProperty("HTTP method, like POST, GET") + private String method; + + @ApiModelProperty("HTTP headers") + private Map<String, String> headers; + + @ApiModelProperty("Max retry times") + private Integer maxRetryTimes; + + public HttpSink() { + this.setSinkType(SinkType.HTTP); + } + + @Override + public SinkRequest genSinkRequest() { + return CommonBeanUtils.copyProperties(this, HttpSinkRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java new file mode 100644 index 0000000000..5431e07ec7 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.http; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; + +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +import java.util.Map; + +/** + * Sink info of Cloud log service + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class HttpSinkDTO extends BaseStreamSink { + + @ApiModelProperty("HTTP path") + private String path; + + @ApiModelProperty("HTTP method, like POST, GET") + private String method; + + @ApiModelProperty("HTTP headers") + private Map<String, String> headers; + + @ApiModelProperty("Max retry times") + private Integer maxRetryTimes; + + /** + * Get the dto instance from the request + */ + public static HttpSinkDTO getFromRequest(HttpSinkRequest request, String extParams) { + HttpSinkDTO dto = StringUtils.isNotBlank(extParams) + ? HttpSinkDTO.getFromJson(extParams) + : new HttpSinkDTO(); + return CommonBeanUtils.copyProperties(request, dto, true); + } + + public static HttpSinkDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, HttpSinkDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, + String.format("parse extParams of http SinkDTO failure: %s", e.getMessage())); + } + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java similarity index 53% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java index 09115a9e02..207322aa3a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java @@ -15,26 +15,40 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.sink; +package org.apache.inlong.manager.pojo.sink.http; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.Map; /** - * The base parameter class of StreamSink, support user extend their own business params. + * Http sink request. */ @Data -@AllArgsConstructor -@NoArgsConstructor -@ApiModel("Base info of stream sink") -public class BaseStreamSink { +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "Http sink request") +@JsonTypeDefine(value = SinkType.HTTP) +public class HttpSinkRequest extends SinkRequest { + + @ApiModelProperty("HTTP path") + private String path; + + @ApiModelProperty("HTTP method, like POST, GET") + private String method; + + @ApiModelProperty("HTTP headers") + private Map<String, String> headers; - @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") - private String startConsumeTime; + @ApiModelProperty("Max retry times") + private Integer maxRetryTimes; - @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") - private String stopConsumeTime; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java index 6ffdbafae0..b328a39b37 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java @@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo; import org.apache.inlong.manager.pojo.cluster.sort.es.SortEsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.http.SortHttpClusterInfo; import org.apache.inlong.manager.pojo.cluster.sort.kafka.SortKafkaClusterInfo; import org.apache.inlong.manager.pojo.cluster.sort.pulsar.SortPulsarClusterInfo; import org.apache.inlong.manager.pojo.sort.BaseSortClusterDTO; @@ -50,6 +51,7 @@ public class SortClusterOperator extends AbstractClusterOperator { private static final Set<String> SORT_CLUSTER_SET = new HashSet<String>() { { + add(ClusterType.SORT_HTTP); add(ClusterType.SORT_CLS); add(ClusterType.SORT_PULSAR); add(ClusterType.SORT_ES); @@ -84,6 +86,9 @@ public class SortClusterOperator extends AbstractClusterOperator { ClusterInfo sortClusterInfo; switch (entity.getType()) { + case ClusterType.SORT_HTTP: + sortClusterInfo = new SortHttpClusterInfo(); + break; case ClusterType.SORT_CLS: sortClusterInfo = new SortClsClusterInfo(); break; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java new file mode 100644 index 0000000000..18482d4a04 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node.http; + +import org.apache.inlong.common.pojo.sort.node.HttpNodeConfig; +import org.apache.inlong.common.pojo.sort.node.NodeConfig; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; +import org.apache.inlong.manager.pojo.node.http.HttpDataNodeDTO; +import org.apache.inlong.manager.pojo.node.http.HttpDataNodeInfo; +import org.apache.inlong.manager.pojo.node.http.HttpDataNodeRequest; +import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class HttpDataNodeOperator extends AbstractDataNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(HttpDataNodeOperator.class); + + @Autowired + private ObjectMapper objectMapper; + + @Override + protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { + HttpDataNodeRequest httpDataNodeRequest = (HttpDataNodeRequest) request; + CommonBeanUtils.copyProperties(httpDataNodeRequest, targetEntity, true); + try { + HttpDataNodeDTO dto = HttpDataNodeDTO.getFromRequest(httpDataNodeRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("Failed to build extParams for Cloud log service node: %s", e.getMessage())); + } + } + + @Override + public Boolean accept(String dataNodeType) { + return DataNodeType.HTTP.equals(dataNodeType); + } + + @Override + public String getDataNodeType() { + return DataNodeType.HTTP; + } + + @Override + public DataNodeInfo getFromEntity(DataNodeEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND); + } + HttpDataNodeInfo info = new HttpDataNodeInfo(); + CommonBeanUtils.copyProperties(entity, info); + if (StringUtils.isNotBlank(entity.getExtParams())) { + HttpDataNodeDTO dto = HttpDataNodeDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, info); + } + return info; + } + + @Override + public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) { + DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity); + HttpNodeConfig httpNodeConfig = CommonBeanUtils.copyProperties(dataNodeInfo, HttpNodeConfig::new); + HttpDataNodeDTO dto = HttpDataNodeDTO.getFromJson(dataNodeEntity.getExtParams()); + CommonBeanUtils.copyProperties(dto, httpNodeConfig); + httpNodeConfig.setPassword(dataNodeEntity.getToken()); + httpNodeConfig.setNodeName(dataNodeInfo.getName()); + return httpNodeConfig; + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java new file mode 100644 index 0000000000..030c0c787a --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.sink.http; + +import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.sink.HttpSinkConfig; +import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; +import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.node.http.HttpDataNodeDTO; +import org.apache.inlong.manager.pojo.sink.SinkField; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.http.HttpSink; +import org.apache.inlong.manager.pojo.sink.http.HttpSinkDTO; +import org.apache.inlong.manager.pojo.sink.http.HttpSinkRequest; +import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.service.sink.AbstractSinkOperator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Http sink operator + */ +@Service +public class HttpSinkOperator extends AbstractSinkOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkOperator.class); + + @Autowired + private ObjectMapper objectMapper; + @Autowired + private DataNodeEntityMapper dataNodeEntityMapper; + + @Override + protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) { + if (!this.getSinkType().equals(request.getSinkType())) { + throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT, + ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType()); + } + HttpSinkRequest sinkRequest = (HttpSinkRequest) request; + try { + HttpSinkDTO dto = HttpSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, + String.format("serialize extParams of http SinkDTO failure: %s", e.getMessage())); + } + } + + @Override + protected String getSinkType() { + return SinkType.HTTP; + } + + @Override + public Boolean accept(String sinkType) { + return SinkType.HTTP.equals(sinkType); + } + + @Override + public StreamSink getFromEntity(StreamSinkEntity entity) { + HttpSink sink = new HttpSink(); + if (entity == null) { + return sink; + } + + HttpSinkDTO dto = HttpSinkDTO.getFromJson(entity.getExtParams()); + DataNodeEntity dataNodeEntity = dataNodeEntityMapper.selectByUniqueKey(entity.getDataNodeName(), + DataNodeType.HTTP); + HttpDataNodeDTO httpDataNodeDTO = JsonUtils.parseObject(dataNodeEntity.getExtParams(), + HttpDataNodeDTO.class); + CommonBeanUtils.copyProperties(entity, sink, true); + CommonBeanUtils.copyProperties(dto, sink, true); + CommonBeanUtils.copyProperties(httpDataNodeDTO, sink, true); + List<SinkField> sinkFields = getSinkFields(entity.getId()); + sink.setSinkFieldList(sinkFields); + return sink; + } + + @Override + public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, StreamSink sink) { + HttpSink httpSink = (HttpSink) sink; + HttpSinkConfig sinkConfig = CommonBeanUtils.copyProperties(httpSink, HttpSinkConfig::new); + List<FieldConfig> fields = sinkFieldMapper.selectBySinkId(sink.getId()).stream().map( + v -> { + FieldConfig fieldConfig = new FieldConfig(); + FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat( + v.getFieldType().toLowerCase()); + fieldConfig.setName(v.getFieldName()); + fieldConfig.setFormatInfo(formatInfo); + return fieldConfig; + }).collect(Collectors.toList()); + sinkConfig.setFieldConfigs(fields); + return sinkConfig; + } +}