This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ae9990cdc2 [INLONG-8988][Manager] Supports multiple wrap types for 
message body (#8989)
ae9990cdc2 is described below

commit ae9990cdc2fbad73662d0496e9c2c5cd928acc8e
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Sep 26 18:34:03 2023 +0800

    [INLONG-8988][Manager] Supports multiple wrap types for message body (#8989)
    
    * [INLONG-8988][Manager] Supports multiple wrap types for message body
---
 inlong-manager/manager-dao/pom.xml                 |  8 +++++
 .../manager/dao/entity/InlongStreamEntity.java     |  1 +
 .../resources/mappers/InlongStreamEntityMapper.xml | 40 ++++++++++++++--------
 .../pojo/sort/node/base/ExtractNodeProvider.java   |  5 +--
 .../pojo/sort/node/provider/KafkaProvider.java     |  2 +-
 .../pojo/sort/node/provider/PulsarProvider.java    |  2 +-
 .../manager/pojo/source/kafka/KafkaSource.java     |  5 +--
 .../manager/pojo/source/pulsar/PulsarSource.java   |  5 +--
 .../manager/pojo/stream/InlongStreamBriefInfo.java |  4 +--
 .../manager/pojo/stream/InlongStreamExtParam.java  |  3 --
 .../manager/pojo/stream/InlongStreamInfo.java      |  4 +--
 .../manager/pojo/stream/InlongStreamRequest.java   |  5 +--
 .../service/source/kafka/KafkaSourceOperator.java  |  2 +-
 .../source/pulsar/PulsarSourceOperator.java        |  2 +-
 .../main/resources/h2/apache_inlong_manager.sql    |  1 +
 .../manager-web/sql/apache_inlong_manager.sql      |  1 +
 inlong-manager/manager-web/sql/changes-1.10.0.sql  | 31 +++++++++++++++++
 17 files changed, 88 insertions(+), 33 deletions(-)

diff --git a/inlong-manager/manager-dao/pom.xml 
b/inlong-manager/manager-dao/pom.xml
index 18e4dd06d4..7848b6dd26 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -65,6 +65,14 @@
                     <groupId>org.springframework.boot</groupId>
                     <artifactId>spring-boot-autoconfigure</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.sun</groupId>
+                    <artifactId>tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun</groupId>
+                    <artifactId>jconsole</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamEntity.java
index 0cad8a5083..182eec90cd 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamEntity.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamEntity.java
@@ -37,6 +37,7 @@ public class InlongStreamEntity implements Serializable {
     private String mqResource;
 
     private String dataType;
+    private String wrapType;
     private String dataEncoding;
     private String dataSeparator;
     private String dataEscapeChar;
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
index 6f1f0aef14..9a693c1236 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
@@ -29,6 +29,7 @@
         <result column="mq_resource" jdbcType="VARCHAR" property="mqResource"/>
 
         <result column="data_type" jdbcType="VARCHAR" property="dataType"/>
+        <result column="wrap_type" jdbcType="VARCHAR" property="wrapType"/>
         <result column="data_encoding" jdbcType="VARCHAR" 
property="dataEncoding"/>
         <result column="data_separator" jdbcType="VARCHAR" 
property="dataSeparator"/>
         <result column="data_escape_char" jdbcType="VARCHAR" 
property="dataEscapeChar"/>
@@ -52,7 +53,7 @@
     </resultMap>
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, name, description, mq_resource,
-        data_type, data_encoding, data_separator, data_escape_char, sync_send,
+        data_type, wrap_type, data_encoding, data_separator, data_escape_char, 
sync_send,
         daily_records, daily_storage, peak_records, max_length, 
storage_period, ext_params,
         status, previous_status, is_deleted, creator, modifier, create_time, 
modify_time, version
     </sql>
@@ -61,18 +62,20 @@
             
parameterType="org.apache.inlong.manager.dao.entity.InlongStreamEntity">
         insert into inlong_stream (id, inlong_group_id, inlong_stream_id,
                                    name, description, mq_resource,
-                                   data_type, data_encoding, data_separator,
-                                   data_escape_char, sync_send, daily_records,
-                                   daily_storage, peak_records, max_length,
-                                   storage_period, ext_params, status,
-                                   previous_status, creator, modifier)
+                                   data_type, wrap_type, data_encoding,
+                                   data_separator, data_escape_char, sync_send,
+                                   daily_records, daily_storage, peak_records,
+                                   max_length,storage_period, ext_params,
+                                   status, previous_status, creator,
+                                   modifier)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, 
#{inlongStreamId,jdbcType=VARCHAR},
                 #{name,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR}, 
#{mqResource,jdbcType=VARCHAR},
-                #{dataType,jdbcType=VARCHAR}, 
#{dataEncoding,jdbcType=VARCHAR}, #{dataSeparator,jdbcType=VARCHAR},
-                #{dataEscapeChar,jdbcType=VARCHAR}, 
#{syncSend,jdbcType=INTEGER}, #{dailyRecords,jdbcType=INTEGER},
-                #{dailyStorage,jdbcType=INTEGER}, 
#{peakRecords,jdbcType=INTEGER}, #{maxLength,jdbcType=INTEGER},
-                #{storagePeriod,jdbcType=INTEGER}, 
#{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
-                #{previousStatus,jdbcType=INTEGER}, 
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
+                #{dataType,jdbcType=VARCHAR}, #{wrapType,jdbcType=VARCHAR}, 
#{dataEncoding,jdbcType=VARCHAR},
+                #{dataSeparator,jdbcType=VARCHAR}, 
#{dataEscapeChar,jdbcType=VARCHAR}, #{syncSend,jdbcType=INTEGER},
+                #{dailyRecords,jdbcType=INTEGER}, 
#{dailyStorage,jdbcType=INTEGER}, #{peakRecords,jdbcType=INTEGER},
+                #{maxLength,jdbcType=INTEGER}, 
#{storagePeriod,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
+                #{status,jdbcType=INTEGER}, 
#{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
+                #{modifier,jdbcType=VARCHAR})
     </insert>
     <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
             
parameterType="org.apache.inlong.manager.dao.entity.InlongStreamEntity">
@@ -99,6 +102,9 @@
             <if test="dataType != null">
                 data_type,
             </if>
+            <if test="wrapType != null">
+                wrap_type,
+            </if>
             <if test="dataEncoding != null">
                 data_encoding,
             </if>
@@ -164,6 +170,9 @@
             <if test="dataType != null">
                 #{dataType,jdbcType=VARCHAR},
             </if>
+            <if test="wrapType != null">
+                #{wrapType,jdbcType=VARCHAR},
+            </if>
             <if test="dataEncoding != null">
                 #{dataEncoding,jdbcType=VARCHAR},
             </if>
@@ -234,7 +243,7 @@
             
parameterType="org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest">
         select
         distinct stream.id, stream.inlong_group_id, stream.inlong_stream_id, 
stream.name,
-        stream.description, stream.mq_resource, stream.data_type, 
stream.data_encoding,
+        stream.description, stream.mq_resource, stream.data_type, 
stream.wrap_type, stream.data_encoding,
         stream.data_separator, stream.data_escape_char, stream.sync_send, 
stream.daily_records,
         stream.daily_storage, stream.peak_records, stream.max_length, 
stream.storage_period,
         stream.status, stream.creator, stream.modifier, stream.create_time, 
stream.modify_time, stream.version
@@ -286,8 +295,7 @@
         and is_deleted = 0
     </select>
     <select id="selectAllStreams" 
resultType="org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo">
-        select
-               inlong_group_id,
+        select inlong_group_id,
                inlong_stream_id,
                mq_resource,
                ext_params
@@ -311,6 +319,7 @@
             description      = #{description, jdbcType=VARCHAR},
             mq_resource      = #{mqResource, jdbcType=VARCHAR},
             data_type        = #{dataType, jdbcType=VARCHAR},
+            wrap_type        = #{wrapType, jdbcType=VARCHAR},
             data_encoding    = #{dataEncoding, jdbcType=VARCHAR},
             data_separator   = #{dataSeparator, jdbcType=VARCHAR},
             data_escape_char = #{dataEscapeChar, jdbcType=VARCHAR},
@@ -351,6 +360,9 @@
             <if test="dataType != null">
                 data_type = #{dataType, jdbcType=VARCHAR},
             </if>
+            <if test="wrapType != null">
+                wrap_type = #{wrapType, jdbcType=VARCHAR},
+            </if>
             <if test="dataEncoding != null">
                 data_encoding = #{dataEncoding, jdbcType=VARCHAR},
             </if>
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index a8622a7be8..42b36b7591 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.pojo.sort.node.base;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.MessageWrapType;
 import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
 import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -92,7 +93,7 @@ public interface ExtractNodeProvider extends NodeProvider {
      */
     default Format parsingFormat(
             String serializationType,
-            boolean wrapWithInlongMsg,
+            String wrapType,
             String separatorStr,
             boolean ignoreParseErrors) {
         Format format;
@@ -129,7 +130,7 @@ public interface ExtractNodeProvider extends NodeProvider {
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
dataType=%s", dataType));
         }
-        if (wrapWithInlongMsg) {
+        if (Objects.equals(wrapType, MessageWrapType.INLONG_MSG_V0.getName())) 
{
             Format innerFormat = format;
             format = new InLongMsgFormat(innerFormat, false);
         }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index e8315660b6..01649bea41 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -64,7 +64,7 @@ public class KafkaProvider implements ExtractNodeProvider, 
LoadNodeProvider {
 
         Format format = parsingFormat(
                 kafkaSource.getSerializationType(),
-                kafkaSource.isWrapWithInlongMsg(),
+                kafkaSource.getWrapType(),
                 kafkaSource.getDataSeparator(),
                 kafkaSource.isIgnoreParseErrors());
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index da20e8b203..b0bcd0c1c8 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -52,7 +52,7 @@ public class PulsarProvider implements ExtractNodeProvider {
                 pulsarSource.getPulsarTenant() + "/" + 
pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
 
         Format format = parsingFormat(pulsarSource.getSerializationType(),
-                pulsarSource.isWrapWithInlongMsg(),
+                pulsarSource.getWrapType(),
                 pulsarSource.getDataSeparator(),
                 pulsarSource.isIgnoreParseError());
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
index c7ffdbdbfc..0ccb0b45aa 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.source.kafka;
 
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -92,8 +93,8 @@ public class KafkaSource extends StreamSource {
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
 
-    @ApiModelProperty("Whether wrap content with InlongMsg")
-    private boolean wrapWithInlongMsg = true;
+    @ApiModelProperty(value = "The message body wrap  wrap type, including: 
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
+    private String wrapType = MessageWrapType.INLONG_MSG_V0.getName();
 
     public KafkaSource() {
         this.setSourceType(SourceType.KAFKA);
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index 25d23f2e16..cdab4d59cf 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.source.pulsar;
 
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -83,9 +84,9 @@ public class PulsarSource extends StreamSource {
     @Builder.Default
     private boolean isInlongComponent = false;
 
-    @ApiModelProperty("Whether wrap content with InlongMsg")
+    @ApiModelProperty(value = "The message body wrap  wrap type, including: 
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
     @Builder.Default
-    private boolean wrapWithInlongMsg = true;
+    private String wrapType = MessageWrapType.INLONG_MSG_V0.getName();
 
     public PulsarSource() {
         this.setSourceType(SourceType.PULSAR);
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
index 28dadc2b53..90197f9d32 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
@@ -82,8 +82,8 @@ public class InlongStreamBriefInfo {
     @ApiModelProperty(value = "Data storage period, unit: day")
     private Integer storagePeriod;
 
-    @ApiModelProperty(value = "Whether the message body wrapped with 
InlongMsg")
-    private Boolean wrapWithInlongMsg;
+    @ApiModelProperty(value = "The message body  wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, etc")
+    private String wrapType;
 
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 0809b704cb..1e80e0bd7f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -45,9 +45,6 @@ public class InlongStreamExtParam implements Serializable {
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private boolean ignoreParseError;
 
-    @ApiModelProperty(value = "Whether the message body wrapped with 
InlongMsg")
-    private boolean wrapWithInlongMsg;
-
     /**
      * Pack extended attributes into ExtParams
      *
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index c2b501b901..1a579d4960 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -130,8 +130,8 @@ public class InlongStreamInfo extends BaseInlongStream {
     @ApiModelProperty(value = "Version number")
     private Integer version;
 
-    @ApiModelProperty(value = "Whether the message body wrapped with 
InlongMsg")
-    private Boolean wrapWithInlongMsg = true;
+    @ApiModelProperty(value = "The message body wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, etc")
+    private String wrapType;
 
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError = true;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index 65de69d1db..e2944329f4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -121,6 +121,7 @@ public class InlongStreamRequest extends BaseInlongStream {
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private boolean ignoreParseError = true;
 
-    @ApiModelProperty(value = "Whether the message body wrapped with 
InlongMsg")
-    private boolean wrapWithInlongMsg = true;
+    @ApiModelProperty(value = "The message body  wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
+    private String wrapType;
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 0037170e79..6e1db90b37 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -152,7 +152,7 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
                 }
             }
 
-            
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
+            kafkaSource.setWrapType(streamInfo.getWrapType());
 
             kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());
             kafkaSource.setFieldList(streamInfo.getFieldList());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 6aa998962a..989031184f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -142,7 +142,7 @@ public class PulsarSourceOperator extends 
AbstractSourceOperator {
                 String serializationType = 
DataTypeEnum.forType(streamInfo.getDataType()).getType();
                 pulsarSource.setSerializationType(serializationType);
             }
-            
pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
+            pulsarSource.setWrapType(streamInfo.getWrapType());
             pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
 
             // set the token info
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 63ea3df4dd..2e02f2cc80 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -233,6 +233,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
     `description`      varchar(256)          DEFAULT '' COMMENT 'Description 
of inlong stream',
     `mq_resource`      varchar(128)          DEFAULT NULL COMMENT 'MQ 
resource, in one stream, corresponding to the filter ID of TubeMQ, 
corresponding to the topic of Pulsar',
     `data_type`        varchar(20)           DEFAULT NULL COMMENT 'Data type, 
including: CSV, KEY-VALUE, JSON, AVRO, etc.',
+    `wrap_type`        varchar(256)          DEFAULT 'INLONG_MSG_V0' COMMENT 
'The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc',
     `data_encoding`    varchar(8)            DEFAULT 'UTF-8' COMMENT 'Data 
encoding format, including: UTF-8, GBK, etc.',
     `data_separator`   varchar(8)            DEFAULT NULL COMMENT 'The source 
data field separator',
     `data_escape_char` varchar(8)            DEFAULT NULL COMMENT 'Source data 
field escape character, the default is NULL (NULL), stored as 1 character',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index d96121ab14..1054722f2d 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -247,6 +247,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
     `description`      varchar(256)          DEFAULT '' COMMENT 'Description 
of inlong stream',
     `mq_resource`      varchar(128)          DEFAULT NULL COMMENT 'MQ 
resource, in one stream, corresponding to the filter ID of TubeMQ, 
corresponding to the topic of Pulsar',
     `data_type`        varchar(20)           DEFAULT NULL COMMENT 'Data type, 
including: CSV, KEY-VALUE, JSON, AVRO, etc.',
+    `wrap_type`        varchar(256)          DEFAULT 'INLONG_MSG_V0' COMMENT 
'The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc',
     `data_encoding`    varchar(8)            DEFAULT 'UTF-8' COMMENT 'Data 
encoding format, including: UTF-8, GBK, etc.',
     `data_separator`   varchar(8)            DEFAULT NULL COMMENT 'The source 
data field separator',
     `data_escape_char` varchar(8)            DEFAULT NULL COMMENT 'Source data 
field escape character, the default is NULL (NULL), stored as 1 character',
diff --git a/inlong-manager/manager-web/sql/changes-1.10.0.sql 
b/inlong-manager/manager-web/sql/changes-1.10.0.sql
new file mode 100644
index 0000000000..1893852050
--- /dev/null
+++ b/inlong-manager/manager-web/sql/changes-1.10.0.sql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is the SQL change file from version 1.9.0 to the current version 
1.10.0.
+-- When upgrading to version 1.10.0, please execute those SQLs in the DB (such 
as MySQL) used by the Manager module.
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+
+USE `apache_inlong_manager`;
+
+ALTER TABLE `inlong_stream`
+    ADD COLUMN `wrap_type` varchar(256) DEFAULT 'INLONG_MSG_V0' COMMENT 'The 
message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc';
+
+
+
+

Reply via email to