This is an automated email from the ASF dual-hosted git repository. wakefu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 91a0cb3aa4 [INLONG-11508][Manager] Add APIs to dirty data query (#11509) 91a0cb3aa4 is described below commit 91a0cb3aa42fc855f3bd8c91cf0dc12ab54c5e02 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Nov 19 18:32:25 2024 +0800 [INLONG-11508][Manager] Add APIs to dirty data query (#11509) * [INLONG-11508][Manager] Add APIs to dirty data query --- .../manager/dao/entity/DirtyQueryLogEntity.java} | 27 ++- .../dao/mapper/DirtyQueryLogEntityMapper.java} | 25 ++- .../mappers/DirtyQueryLogEntityMapper.xml | 93 +++++++++++ .../inlong/manager/pojo/sink/BaseStreamSink.java | 17 ++ ...treamSink.java => DirtyDataDetailResponse.java} | 46 +++-- .../{BaseStreamSink.java => DirtyDataRequest.java} | 38 +++-- .../manager/pojo/sink/DirtyDataResponse.java} | 21 ++- .../pojo/sink/DirtyDataTrendDetailResponse.java} | 24 ++- ...eStreamSink.java => DirtyDataTrendRequest.java} | 35 ++-- .../inlong/manager/pojo/sink/SinkRequest.java | 3 + .../inlong/manager/pojo/sink/StreamSink.java | 3 + .../service/dirtyData/DirtyQueryLogService.java} | 36 ++-- .../dirtyData/impl/DirtyQueryLogServiceImpl.java | 185 +++++++++++++++++++++ .../main/resources/h2/apache_inlong_manager.sql | 18 ++ .../manager-web/sql/apache_inlong_manager.sql | 19 +++ inlong-manager/manager-web/sql/changes-2.1.0.sql | 16 ++ .../web/controller/StreamSinkController.java | 38 +++++ .../src/main/resources/application-dev.properties | 7 +- .../src/main/resources/application-prod.properties | 7 +- .../src/main/resources/application-test.properties | 7 +- 20 files changed, 577 insertions(+), 88 deletions(-) diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DirtyQueryLogEntity.java similarity index 60% copy from inlong-manager/manager-web/sql/changes-2.1.0.sql copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DirtyQueryLogEntity.java index 0d4c984778..3554284f34 100644 --- a/inlong-manager/manager-web/sql/changes-2.1.0.sql +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DirtyQueryLogEntity.java @@ -15,13 +15,26 @@ * limitations under the License. */ --- This is the SQL change file from version 1.4.0 to the current version 1.5.0. --- When upgrading to version 1.5.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. +package org.apache.inlong.manager.dao.entity; -SET NAMES utf8mb4; -SET FOREIGN_KEY_CHECKS = 0; +import lombok.Data; -USE `apache_inlong_manager`; +import java.io.Serializable; +import java.util.Date; -ALTER TABLE `schedule_config` - ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler'; +@Data +public class DirtyQueryLogEntity implements Serializable { + + private static final long serialVersionUID = 1L; + + private Integer id; + private String md5; + private String requestParams; + private String taskId; + private Integer isDeleted; + private String creator; + private String modifier; + private Date createTime; + private Date modifyTime; + private Integer version; +} \ No newline at end of file diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DirtyQueryLogEntityMapper.java similarity index 59% copy from inlong-manager/manager-web/sql/changes-2.1.0.sql copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DirtyQueryLogEntityMapper.java index 0d4c984778..08db57441a 100644 --- a/inlong-manager/manager-web/sql/changes-2.1.0.sql +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DirtyQueryLogEntityMapper.java @@ -15,13 +15,24 @@ * limitations under the License. */ --- This is the SQL change file from version 1.4.0 to the current version 1.5.0. --- When upgrading to version 1.5.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. +package org.apache.inlong.manager.dao.mapper; -SET NAMES utf8mb4; -SET FOREIGN_KEY_CHECKS = 0; +import org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity; -USE `apache_inlong_manager`; +import org.apache.ibatis.annotations.Param; +import org.springframework.stereotype.Repository; -ALTER TABLE `schedule_config` - ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler'; +@Repository +public interface DirtyQueryLogEntityMapper { + + int updateByIdSelective(DirtyQueryLogEntity record); + + int insert(DirtyQueryLogEntity record); + + DirtyQueryLogEntity selectByPrimaryKey(Integer id); + + DirtyQueryLogEntity selectByMd5(String md5); + + void updateToTimeout(@Param("beforeMinutes") Integer beforeMinutes); + +} \ No newline at end of file diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DirtyQueryLogEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DirtyQueryLogEntityMapper.xml new file mode 100644 index 0000000000..0fb1ce8e5f --- /dev/null +++ b/inlong-manager/manager-dao/src/main/resources/mappers/DirtyQueryLogEntityMapper.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> +<mapper namespace="org.apache.inlong.manager.dao.mapper.DirtyQueryLogEntityMapper"> + <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity"> + <id column="id" jdbcType="INTEGER" property="id" /> + <result column="md5" jdbcType="VARCHAR" property="md5" /> + <result column="request_params" jdbcType="VARCHAR" property="requestParams" /> + <result column="task_id" jdbcType="VARCHAR" property="taskId" /> + <result column="is_deleted" jdbcType="INTEGER" property="isDeleted" /> + <result column="creator" jdbcType="VARCHAR" property="creator" /> + <result column="modifier" jdbcType="VARCHAR" property="modifier" /> + <result column="create_time" jdbcType="TIMESTAMP" property="createTime" /> + <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" /> + <result column="version" jdbcType="INTEGER" property="version" /> + </resultMap> + + <sql id="Base_Column_List"> + id, md5, request_params, task_id, is_deleted, creator, modifier, create_time, modify_time, version + </sql> + <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap"> + select + <include refid="Base_Column_List" /> + from dirty_query_log + where is_deleted = 0 + and id = #{id,jdbcType=INTEGER} + </select> + <select id="selectByMd5" parameterType="java.lang.String" resultMap="BaseResultMap"> + select + <include refid="Base_Column_List" /> + from dirty_query_log + where is_deleted = 0 + and md5 = #{md5,jdbcType=VARCHAR} + limit 1 + </select> + <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity"> + insert into dirty_query_log (id, md5, request_params, + task_id, creator, modifier) + values (#{id,jdbcType=INTEGER}, #{md5,jdbcType=VARCHAR}, #{requestParams,jdbcType=VARCHAR}, + #{taskId,jdbcType=VARCHAR}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) + </insert> + <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity"> + update dirty_query_log + <set> + <if test="md5 != null"> + md5 = #{md5,jdbcType=VARCHAR}, + </if> + <if test="requestParams != null"> + request_params = #{requestParams,jdbcType=VARCHAR}, + </if> + <if test="taskId != null"> + task_id = #{taskId,jdbcType=VARCHAR}, + </if> + <if test="isDeleted != null"> + is_deleted = #{isDeleted,jdbcType=INTEGER}, + </if> + <if test="modifier != null"> + modifier = #{modifier,jdbcType=VARCHAR}, + </if> + version = #{version,jdbcType=INTEGER} + 1 + </set> + where id = #{id,jdbcType=INTEGER} + and version = #{version,jdbcType=INTEGER} + </update> + <update id="updateToTimeout"> + update dirty_query_log + <set> + is_deleted = id + </set> + <where> + is_deleted = 0 + and create_time <= DATE_ADD(NOW(), INTERVAL -#{beforeMinutes, jdbcType=INTEGER} MINUTE) + </where> + </update> +</mapper> \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java index d8df7f4a28..6ad6c9b523 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java @@ -17,12 +17,18 @@ package org.apache.inlong.manager.pojo.sink; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.JsonUtils; + import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import javax.validation.constraints.NotNull; + /** * The base parameter class of StreamSink, support user extend their own business params. */ @@ -32,6 +38,9 @@ import lombok.NoArgsConstructor; @ApiModel("Base info of stream sink") public class BaseStreamSink { + @ApiModelProperty("Enable data archiving") + private Boolean enableDataArchiving; + @ApiModelProperty("Transform sql") private String transformSql; @@ -40,4 +49,12 @@ public class BaseStreamSink { @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") private String stopConsumeTime; + + public static BaseStreamSink getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, BaseStreamSink.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataDetailResponse.java similarity index 50% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataDetailResponse.java index d8df7f4a28..2389217a40 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataDetailResponse.java @@ -19,25 +19,45 @@ package org.apache.inlong.manager.pojo.sink; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; /** - * The base parameter class of StreamSink, support user extend their own business params. + * Dirty data detail info. */ @Data -@AllArgsConstructor -@NoArgsConstructor -@ApiModel("Base info of stream sink") -public class BaseStreamSink { +@ApiModel("Dirty data detail info") +public class DirtyDataDetailResponse { - @ApiModelProperty("Transform sql") - private String transformSql; + @ApiModelProperty(value = "Tdbank imp date") + private String tdbankImpDate; - @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") - private String startConsumeTime; + @ApiModelProperty(value = "Data flow id") + private String dataFlowId; - @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") - private String stopConsumeTime; + @ApiModelProperty(value = "Inlong group id") + private String groupId; + + @ApiModelProperty(value = "Inlong stream id") + private String streamId; + + @ApiModelProperty(value = "Report time") + private String reportTime; + + @ApiModelProperty(value = "Data time") + private String dataTime; + + @ApiModelProperty(value = "Server type") + private String serverType; + + @ApiModelProperty(value = "Dirty type") + private String dirtyType; + + @ApiModelProperty(value = "Dirty message") + private String dirtyMessage; + + @ApiModelProperty(value = "Ext info") + private String extInfo; + + @ApiModelProperty(value = "Dirty data") + private String dirtyData; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataRequest.java similarity index 56% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataRequest.java index d8df7f4a28..8b3fc32a6f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataRequest.java @@ -19,25 +19,37 @@ package org.apache.inlong.manager.pojo.sink; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.EqualsAndHashCode; + +import java.util.List; /** - * The base parameter class of StreamSink, support user extend their own business params. + * Query request for Dirty data */ @Data -@AllArgsConstructor -@NoArgsConstructor -@ApiModel("Base info of stream sink") -public class BaseStreamSink { +@EqualsAndHashCode(callSuper = false) +@ApiModel("Query request for Dirty data") +public class DirtyDataRequest { + + @ApiModelProperty(value = "Sink id list") + private List<Integer> sinkIdList; + + @ApiModelProperty(value = "Key word") + private String keyword; + + @ApiModelProperty(value = "Server type") + private String serverType; + + @ApiModelProperty(value = "Dirty type") + private String dirtyType; - @ApiModelProperty("Transform sql") - private String transformSql; + @ApiModelProperty(value = "Start time") + private String startTime; - @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") - private String startConsumeTime; + @ApiModelProperty(value = "End time") + private String endTime; - @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") - private String stopConsumeTime; + @ApiModelProperty(value = "Data count") + private Integer dataCount = 10; } diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataResponse.java similarity index 64% copy from inlong-manager/manager-web/sql/changes-2.1.0.sql copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataResponse.java index 0d4c984778..08203db09a 100644 --- a/inlong-manager/manager-web/sql/changes-2.1.0.sql +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataResponse.java @@ -15,13 +15,20 @@ * limitations under the License. */ --- This is the SQL change file from version 1.4.0 to the current version 1.5.0. --- When upgrading to version 1.5.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. +package org.apache.inlong.manager.pojo.sink; -SET NAMES utf8mb4; -SET FOREIGN_KEY_CHECKS = 0; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; -USE `apache_inlong_manager`; +/** + * Dirty data info. + */ +@Data +@ApiModel("Dirty data info") +public class DirtyDataResponse { + + @ApiModelProperty(value = "Task id") + private String taskId; -ALTER TABLE `schedule_config` - ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler'; +} diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendDetailResponse.java similarity index 64% copy from inlong-manager/manager-web/sql/changes-2.1.0.sql copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendDetailResponse.java index 0d4c984778..cc56cd93ee 100644 --- a/inlong-manager/manager-web/sql/changes-2.1.0.sql +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendDetailResponse.java @@ -15,13 +15,23 @@ * limitations under the License. */ --- This is the SQL change file from version 1.4.0 to the current version 1.5.0. --- When upgrading to version 1.5.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. +package org.apache.inlong.manager.pojo.sink; -SET NAMES utf8mb4; -SET FOREIGN_KEY_CHECKS = 0; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; -USE `apache_inlong_manager`; +/** + * Dirty data detail info. + */ +@Data +@ApiModel("Dirty data trend detail info") +public class DirtyDataTrendDetailResponse { + + @ApiModelProperty(value = "Report time") + private String reportTime; + + @ApiModelProperty(value = "Data count") + private String count; -ALTER TABLE `schedule_config` - ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler'; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendRequest.java similarity index 59% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendRequest.java index d8df7f4a28..b0e52420a2 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendRequest.java @@ -19,25 +19,34 @@ package org.apache.inlong.manager.pojo.sink; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.EqualsAndHashCode; + +import java.util.List; /** - * The base parameter class of StreamSink, support user extend their own business params. + * Query request for Dirty data */ @Data -@AllArgsConstructor -@NoArgsConstructor -@ApiModel("Base info of stream sink") -public class BaseStreamSink { +@EqualsAndHashCode(callSuper = false) +@ApiModel("Query request for Dirty data") +public class DirtyDataTrendRequest { + + @ApiModelProperty(value = "Sink id list") + private List<Integer> sinkIdList; + + @ApiModelProperty(value = "Data time unit") + private String dataTimeUnit; + + @ApiModelProperty(value = "Server type") + private String serverType; - @ApiModelProperty("Transform sql") - private String transformSql; + @ApiModelProperty(value = "Dirty type") + private String dirtyType; - @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") - private String startConsumeTime; + @ApiModelProperty(value = "Start time") + private String startTime; - @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") - private String stopConsumeTime; + @ApiModelProperty(value = "End time") + private String endTime; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java index 24c544b943..782817a50d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java @@ -100,6 +100,9 @@ public abstract class SinkRequest { @Range(min = 0, max = 1, message = "default is 1, only supports [0: disable, 1: enable]") private Integer enableCreateResource = 1; + @ApiModelProperty("Enable data archiving") + private Boolean enableDataArchiving; + @ApiModelProperty(value = "Whether to start the process after saving or updating. Default is false") private Boolean startProcess = false; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java index 85fd72a1a4..79a5af3876 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java @@ -90,6 +90,9 @@ public abstract class StreamSink extends StreamNode { @ApiModelProperty(value = "Whether to enable create sink resource? 0: disable, 1: enable. Default is 1", notes = "Such as enable or disable to create Hive table") private Integer enableCreateResource = 1; + @ApiModelProperty("Enable data archiving") + private Boolean enableDataArchiving; + @ApiModelProperty("Backend operation log") private String operateLog; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/DirtyQueryLogService.java similarity index 50% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java copy to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/DirtyQueryLogService.java index d8df7f4a28..3629ee985c 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/DirtyQueryLogService.java @@ -15,29 +15,29 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.sink; +package org.apache.inlong.manager.service.dirtyData; -import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; +import org.apache.inlong.manager.pojo.sink.DirtyDataDetailResponse; +import org.apache.inlong.manager.pojo.sink.DirtyDataRequest; +import org.apache.inlong.manager.pojo.sink.DirtyDataResponse; +import org.apache.inlong.manager.pojo.sink.DirtyDataTrendDetailResponse; +import org.apache.inlong.manager.pojo.sink.DirtyDataTrendRequest; + +import java.util.List; /** - * The base parameter class of StreamSink, support user extend their own business params. + * Dirty query log service */ -@Data -@AllArgsConstructor -@NoArgsConstructor -@ApiModel("Base info of stream sink") -public class BaseStreamSink { +public interface DirtyQueryLogService { + + DirtyDataResponse listDirtyData(DirtyDataRequest request); + + DirtyDataResponse listDirtyDataTrend(DirtyDataTrendRequest request); + + List<DirtyDataDetailResponse> getDirtyData(String taskId); - @ApiModelProperty("Transform sql") - private String transformSql; + List<DirtyDataTrendDetailResponse> getDirtyDataTrend(String taskId); - @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") - private String startConsumeTime; + String getSqlTaskStatus(String taskId); - @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format") - private String stopConsumeTime; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/impl/DirtyQueryLogServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/impl/DirtyQueryLogServiceImpl.java new file mode 100644 index 0000000000..a63a320765 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/impl/DirtyQueryLogServiceImpl.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.dirtyData.impl; + +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity; +import org.apache.inlong.manager.dao.mapper.DirtyQueryLogEntityMapper; +import org.apache.inlong.manager.pojo.sink.DirtyDataDetailResponse; +import org.apache.inlong.manager.pojo.sink.DirtyDataRequest; +import org.apache.inlong.manager.pojo.sink.DirtyDataResponse; +import org.apache.inlong.manager.pojo.sink.DirtyDataTrendDetailResponse; +import org.apache.inlong.manager.pojo.sink.DirtyDataTrendRequest; +import org.apache.inlong.manager.pojo.user.LoginUserUtils; +import org.apache.inlong.manager.service.dirtyData.DirtyQueryLogService; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +@Service +@Slf4j +public class DirtyQueryLogServiceImpl implements DirtyQueryLogService { + + private static final Logger LOGGER = LoggerFactory.getLogger(DirtyQueryLogServiceImpl.class); + + @Autowired + private DirtyQueryLogEntityMapper dirtyQueryLogEntityMapper; + @Autowired + private ObjectMapper objectMapper; + + @Value("${dirty.log.clean.enabled:false}") + private Boolean dirtyLogCleanEnabled; + @Value("${dirty.log.clean.interval.minutes:5}") + private Integer dirtyLogCleanInterval; + @Value("${dirty.dirty.retention.minutes:10}") + private Integer retentionMinutes; + @Value("${dirty.dirty.db.table:inlong_iceberg::dirty_data_achive_iceberg}") + private String dirtyDataDbTable; + + @PostConstruct + private void startDirtyLogCleanTask() { + if (dirtyLogCleanEnabled) { + ThreadFactory factory = new ThreadFactoryBuilder() + .setNameFormat("scheduled-dirtyQueryLog-deleted-%d") + .setDaemon(true) + .build(); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(factory); + executor.scheduleWithFixedDelay(() -> { + try { + LOGGER.info("begin to clean dirty query log"); + dirtyQueryLogEntityMapper.updateToTimeout(retentionMinutes); + LOGGER.info("success to clean dirty query log successfully"); + } catch (Throwable t) { + LOGGER.error("clean dirty query log error", t); + } + }, 0, dirtyLogCleanInterval, TimeUnit.MINUTES); + LOGGER.info("clean dirty query log started successfully"); + } + } + + @Override + public DirtyDataResponse listDirtyDataTrend(DirtyDataTrendRequest request) { + if (CollectionUtils.isEmpty(request.getSinkIdList())) { + return null; + } + try { + DirtyDataResponse dirtyDataResponse = new DirtyDataResponse(); + String requestStr = objectMapper.writeValueAsString(request); + String md5 = DigestUtils.md5Hex(requestStr); + DirtyQueryLogEntity dirtyQueryLogEntity = dirtyQueryLogEntityMapper.selectByMd5(md5); + if (dirtyQueryLogEntity != null) { + LOGGER.info("dirty query log is exist"); + dirtyDataResponse.setTaskId(dirtyQueryLogEntity.getTaskId()); + return dirtyDataResponse; + } + DirtyQueryLogEntity dirtyQueryLog = new DirtyQueryLogEntity(); + // TODO dirtyQueryLog.setTaskId(); + dirtyQueryLog.setMd5(md5); + dirtyQueryLog.setRequestParams(requestStr); + dirtyQueryLog.setCreator(LoginUserUtils.getLoginUser().getName()); + dirtyQueryLog.setModifier(LoginUserUtils.getLoginUser().getName()); + dirtyQueryLogEntityMapper.insert(dirtyQueryLog); + + return dirtyDataResponse; + } catch (Exception e) { + throw new BusinessException("list dirty data trend failed"); + } + } + + @Override + public DirtyDataResponse listDirtyData(DirtyDataRequest request) { + if (CollectionUtils.isEmpty(request.getSinkIdList())) { + return null; + } + if (request.getDataCount() == null) { + request.setDataCount(10); + } + try { + DirtyDataResponse dirtyDataResponse = new DirtyDataResponse(); + String requestStr = objectMapper.writeValueAsString(request); + String md5 = DigestUtils.md5Hex(requestStr); + DirtyQueryLogEntity dirtyQueryLogEntity = dirtyQueryLogEntityMapper.selectByMd5(md5); + if (dirtyQueryLogEntity != null) { + LOGGER.info("dirty query log is exist"); + dirtyDataResponse.setTaskId(dirtyQueryLogEntity.getTaskId()); + return dirtyDataResponse; + } + DirtyQueryLogEntity dirtyQueryLog = new DirtyQueryLogEntity(); + // TODO dirtyQueryLog.setTaskId(); + dirtyQueryLog.setMd5(md5); + dirtyQueryLog.setRequestParams(requestStr); + dirtyQueryLog.setCreator(LoginUserUtils.getLoginUser().getName()); + dirtyQueryLog.setModifier(LoginUserUtils.getLoginUser().getName()); + dirtyQueryLogEntityMapper.insert(dirtyQueryLog); + return dirtyDataResponse; + } catch (Exception e) { + LOGGER.error("list dirty data failed", e); + throw new BusinessException("list dirty data failed"); + } + } + + @Override + public List<DirtyDataDetailResponse> getDirtyData(String taskId) { + try { + // TODO + return new ArrayList<>(); + } catch (Exception e) { + LOGGER.error("get dirty data failed", e); + throw new BusinessException("get dirty data failed"); + } + } + + @Override + public List<DirtyDataTrendDetailResponse> getDirtyDataTrend(String taskId) { + try { + // TODO + return new ArrayList<>(); + } catch (Exception e) { + LOGGER.error("get dirty data trend failed", e); + throw new BusinessException("get dirty data trend failed"); + } + } + + @Override + public String getSqlTaskStatus(String taskId) { + try { + // TODO + return "success"; + } catch (Exception e) { + LOGGER.error("get sql task status failed", e); + throw new BusinessException("get get sql task status failed"); + } + } +} diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index 1d92f14f46..6836d44d5b 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -1013,6 +1013,24 @@ CREATE TABLE IF NOT EXISTS `schedule_config` UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config'; + +-- ---------------------------- +-- Table structure for dirty_query_log +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `dirty_query_log` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `md5` varchar(256) NOT NULL COMMENT 'Md5 for request params', + `request_params` mediumtext DEFAULT NULL COMMENT 'Request params, will be saved as JSON string', + `task_id` varchar(256) DEFAULT '' COMMENT 'Task id', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`) +); -- ---------------------------- diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 58259c9047..51a72d6039 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -1066,6 +1066,25 @@ CREATE TABLE IF NOT EXISTS `schedule_config` UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config'; + +-- ---------------------------- +-- Table structure for dirty_query_log +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `dirty_query_log` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `md5` varchar(256) NOT NULL COMMENT 'Md5 for request params', + `request_params` mediumtext DEFAULT NULL COMMENT 'Request params, will be saved as JSON string', + `task_id` varchar(256) DEFAULT '' COMMENT 'Task id', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='Dirty query log table'; -- ---------------------------- SET FOREIGN_KEY_CHECKS = 1; diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql b/inlong-manager/manager-web/sql/changes-2.1.0.sql index 0d4c984778..2fa48f95b4 100644 --- a/inlong-manager/manager-web/sql/changes-2.1.0.sql +++ b/inlong-manager/manager-web/sql/changes-2.1.0.sql @@ -25,3 +25,19 @@ USE `apache_inlong_manager`; ALTER TABLE `schedule_config` ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler'; + +CREATE TABLE IF NOT EXISTS `dirty_query_log` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `md5` varchar(256) NOT NULL COMMENT 'Md5 for request params', + `request_params` mediumtext DEFAULT NULL COMMENT 'Request params, will be saved as JSON string', + `task_id` varchar(256) DEFAULT '' COMMENT 'Task id', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='Dirty query log table'; diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java index b7b8dbd5d4..331e3a5355 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java @@ -25,12 +25,18 @@ import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.common.UpdateResult; +import org.apache.inlong.manager.pojo.sink.DirtyDataDetailResponse; +import org.apache.inlong.manager.pojo.sink.DirtyDataRequest; +import org.apache.inlong.manager.pojo.sink.DirtyDataResponse; +import org.apache.inlong.manager.pojo.sink.DirtyDataTrendDetailResponse; +import org.apache.inlong.manager.pojo.sink.DirtyDataTrendRequest; import org.apache.inlong.manager.pojo.sink.ParseFieldRequest; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkPageRequest; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.user.LoginUserUtils; +import org.apache.inlong.manager.service.dirtyData.DirtyQueryLogService; import org.apache.inlong.manager.service.operationlog.OperationLog; import org.apache.inlong.manager.service.sink.StreamSinkService; @@ -60,6 +66,8 @@ public class StreamSinkController { @Autowired private StreamSinkService sinkService; + @Autowired + private DirtyQueryLogService dirtyQueryLogService; @RequestMapping(value = "/sink/save", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SINK) @@ -143,4 +151,34 @@ public class StreamSinkController { return Response.success(sinkService.parseFields(parseFieldRequest)); } + @RequestMapping(value = "/sink/listDirtyData", method = RequestMethod.POST) + @ApiOperation(value = "List stream sinks by paginating") + public Response<DirtyDataResponse> listDirtyData(@RequestBody DirtyDataRequest request) { + return Response.success(dirtyQueryLogService.listDirtyData(request)); + } + + @RequestMapping(value = "/sink/listDirtyDataTrend", method = RequestMethod.POST) + @ApiOperation(value = "List stream sinks by paginating") + public Response<DirtyDataResponse> listDirtyDataTrend(@RequestBody DirtyDataTrendRequest request) { + return Response.success(dirtyQueryLogService.listDirtyDataTrend(request)); + } + + @RequestMapping(value = "/sink/getDirtyData/{taskId}", method = RequestMethod.GET) + @ApiImplicitParam(name = "taskId", dataTypeClass = String.class, required = true) + public Response<List<DirtyDataDetailResponse>> getDirtyData(@PathVariable String taskId) { + return Response.success(dirtyQueryLogService.getDirtyData(taskId)); + } + + @RequestMapping(value = "/sink/getDirtyDataTrend/{taskId}", method = RequestMethod.GET) + @ApiImplicitParam(name = "taskId", dataTypeClass = String.class, required = true) + public Response<List<DirtyDataTrendDetailResponse>> getDirtyDataTrend(@PathVariable String taskId) { + return Response.success(dirtyQueryLogService.getDirtyDataTrend(taskId)); + } + + @RequestMapping(value = "/sink/SqlTaskStatus/{taskId}", method = RequestMethod.GET) + @ApiImplicitParam(name = "taskId", dataTypeClass = String.class, required = true) + public Response<String> SqlTaskStatus(@PathVariable String taskId) { + return Response.success(dirtyQueryLogService.getSqlTaskStatus(taskId)); + } + } diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 794f201bda..2bad5f801f 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -105,4 +105,9 @@ agent.install.temp.path=inlong/agent-installer-temp/ default.module.id=1 # schedule engine type # support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +inlong.schedule.engine=none + +dirty.log.clean.enabled=false +dirty.log.clean.interval.minutes=5 +dirty.dirty.retention.minutes=10 +dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index 3e8f329470..040c868bcf 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -97,4 +97,9 @@ cls.manager.endpoint=127.0.0.1 # schedule engine type # support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +inlong.schedule.engine=none + +dirty.log.clean.enabled=false +dirty.log.clean.interval.minutes=5 +dirty.dirty.retention.minutes=10 +dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 5ff929c2b8..393eef6b05 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -98,4 +98,9 @@ cls.manager.endpoint=127.0.0.1 # schedule engine type # support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +inlong.schedule.engine=none + +dirty.log.clean.enabled=false +dirty.log.clean.interval.minutes=5 +dirty.dirty.retention.minutes=10 +dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg \ No newline at end of file