This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d4df5eda31 [INLONG-11081][SDK] Transform SQL supports INTERVAL parse 
(#11086)
d4df5eda31 is described below

commit d4df5eda31db2431da8afe391bc2f5b7ccc4418e
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Fri Sep 20 09:57:29 2024 +0800

    [INLONG-11081][SDK] Transform SQL supports INTERVAL parse (#11086)
    
    Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com>
---
 .../transform/process/parser/AdditionParser.java   |  39 ++++-
 .../transform/process/parser/IntervalParser.java   | 154 ++++++++++++++++++
 .../process/parser/SubtractionParser.java          |  39 ++++-
 .../sdk/transform/process/utils/DateUtil.java      | 177 +++++++++++++++++++++
 .../process/parser/AbstractParserTestBase.java     |   5 +
 .../process/parser/TestAdditionParser.java         | 125 +++++++++++++++
 .../process/parser/TestSubtractionParser.java      | 125 +++++++++++++++
 7 files changed, 650 insertions(+), 14 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
index 13536be30d..eea6ce7e17 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
@@ -20,14 +20,17 @@ package org.apache.inlong.sdk.transform.process.parser;
 import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.utils.DateUtil;
 
 import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
+import org.apache.commons.lang3.tuple.Pair;
 
 import java.math.BigDecimal;
+import java.time.temporal.ChronoField;
+import java.util.Map;
 
 /**
  * AdditionParser
- * 
  */
 @TransformParser(values = Addition.class)
 public class AdditionParser implements ValueParser {
@@ -41,16 +44,38 @@ public class AdditionParser implements ValueParser {
         this.right = OperatorTools.buildParser(expr.getRightExpression());
     }
 
-    /**
-     * parse
-     * @param sourceData
-     * @param rowIndex
-     * @return
-     */
     @Override
     public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        if (this.left instanceof IntervalParser && this.right instanceof 
IntervalParser) {
+            return null;
+        } else if (this.left instanceof IntervalParser || this.right 
instanceof IntervalParser) {
+            IntervalParser intervalParser = null;
+            ValueParser dateParser = null;
+            if (this.left instanceof IntervalParser) {
+                intervalParser = (IntervalParser) this.left;
+                dateParser = this.right;
+            } else {
+                intervalParser = (IntervalParser) this.right;
+                dateParser = this.left;
+            }
+            Object intervalPairObj = intervalParser.parse(sourceData, 
rowIndex, context);
+            Object dateObj = dateParser.parse(sourceData, rowIndex, context);
+            if (intervalPairObj == null || dateObj == null) {
+                return null;
+            }
+            return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
+                    (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, 
1);
+        } else {
+            return numericalOperation(sourceData, rowIndex, context);
+        }
+    }
+
+    private BigDecimal numericalOperation(SourceData sourceData, int rowIndex, 
Context context) {
         Object leftObj = this.left.parse(sourceData, rowIndex, context);
         Object rightObj = this.right.parse(sourceData, rowIndex, context);
+        if (leftObj == null || rightObj == null) {
+            return null;
+        }
         BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
         BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
         return leftValue.add(rightValue);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java
new file mode 100644
index 0000000000..7266dcd63b
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.expression.IntervalExpression;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * IntervalParser  <-> INTERVAL expr unit ->  
Pair(factor,Map(ChronoField,Count)):
+ * <p>
+ * `factor`:
+ * <p>
+ *    1) `expr` can accept strings starting with '-', representing the meaning 
of subtraction.
+ *       So the positive or negative sign of `factor` indicates whether `expr` 
starts with a '-' or not.
+ * <p>
+ *    2) For units like WEEK and QUARTER, it is not easy to parse,
+ *       so WEEK -> ( unit=DAY, adb(factor)=7 ); QUARTER -> ( unit=MONTH, 
adb(factor)=3 ).
+ * <p>
+ * `Map(ChronoField,Count)`:
+ * <p>
+ *    IntervalParser will automatically match the corresponding 
DateTimeFormatter based on the input `expr`,
+ *    Based on DateTimeFormatter, IntervalParser will parse the incoming units 
and store them in a Map.
+ * <p>
+ *
+ *  In addition,acceptable expression parsing and specifying parameters in two 
ways:
+ *      1) interval rowName year -> expression
+ *      3) interval 2 year       -> fixed parameter
+ */
+@Slf4j
+@TransformParser(values = IntervalExpression.class)
+public class IntervalParser implements ValueParser {
+
+    private final String intervalType;
+    private final ValueParser dateParser;
+    private final String parameter;
+
+    private static final List<ChronoField> CHRONO_FIELD_LIST = 
Arrays.asList(ChronoField.YEAR,
+            ChronoField.MONTH_OF_YEAR,
+            ChronoField.DAY_OF_MONTH, ChronoField.HOUR_OF_DAY, 
ChronoField.MINUTE_OF_HOUR, ChronoField.SECOND_OF_MINUTE,
+            ChronoField.MICRO_OF_SECOND);
+    private static final Map<String, DateTimeFormatter> DT_FORMATTER_MAP = new 
ConcurrentHashMap<>();
+
+    static {
+        DT_FORMATTER_MAP.put("SECOND_MICROSECOND", 
DateTimeFormatter.ofPattern("s.SSSSSS"));
+        DT_FORMATTER_MAP.put("MINUTE_MICROSECOND", 
DateTimeFormatter.ofPattern("m:s.SSSSSS"));
+        DT_FORMATTER_MAP.put("MINUTE_SECOND", 
DateTimeFormatter.ofPattern("m:s"));
+        DT_FORMATTER_MAP.put("HOUR_MICROSECOND", 
DateTimeFormatter.ofPattern("H:m:s.SSSSSS"));
+        DT_FORMATTER_MAP.put("HOUR_SECOND", 
DateTimeFormatter.ofPattern("H:m:s"));
+        DT_FORMATTER_MAP.put("HOUR_MINUTE", 
DateTimeFormatter.ofPattern("H:m"));
+        DT_FORMATTER_MAP.put("DAY_MICROSECOND", DateTimeFormatter.ofPattern("d 
H:m:s.SSSSSS"));
+        DT_FORMATTER_MAP.put("DAY_SECOND", DateTimeFormatter.ofPattern("d 
H:m:s"));
+        DT_FORMATTER_MAP.put("DAY_MINUTE", DateTimeFormatter.ofPattern("d 
H:m"));
+        DT_FORMATTER_MAP.put("DAY_HOUR", DateTimeFormatter.ofPattern("d H"));
+        DT_FORMATTER_MAP.put("YEAR_MONTH", DateTimeFormatter.ofPattern("y-M"));
+
+        DT_FORMATTER_MAP.put("MICROSECOND", 
DateTimeFormatter.ofPattern("SSSSSS"));
+        DT_FORMATTER_MAP.put("SECOND", DateTimeFormatter.ofPattern("s"));
+        DT_FORMATTER_MAP.put("MINUTE", DateTimeFormatter.ofPattern("m"));
+        DT_FORMATTER_MAP.put("HOUR", DateTimeFormatter.ofPattern("H"));
+        DT_FORMATTER_MAP.put("DAY", DateTimeFormatter.ofPattern("d"));
+        DT_FORMATTER_MAP.put("MONTH", DateTimeFormatter.ofPattern("M"));
+        DT_FORMATTER_MAP.put("YEAR", DateTimeFormatter.ofPattern("y"));
+    }
+
+    public IntervalParser(IntervalExpression expr) {
+        intervalType = expr.getIntervalType().toUpperCase();
+        dateParser = OperatorTools.buildParser(expr.getExpression());
+        if (dateParser == null) {
+            parameter = expr.getParameter();
+        } else {
+            parameter = null;
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        DateTimeFormatter dateTimeFormatter = 
DT_FORMATTER_MAP.get(intervalType);
+
+        String dataStr = parameter;
+        if (dateParser != null) {
+            Object dateObj = dateParser.parse(sourceData, rowIndex, context);
+            if (dateObj == null) {
+                return null;
+            }
+            dataStr = OperatorTools.parseString(dateObj);
+        }
+
+        int factor = 1;
+        if (dateTimeFormatter == null) {
+            if ("WEEK".equals(intervalType)) {
+                dateTimeFormatter = DT_FORMATTER_MAP.get("DAY");
+                factor = 7;
+            } else if ("QUARTER".equals(intervalType)) {
+                dateTimeFormatter = DT_FORMATTER_MAP.get("MONTH");
+                factor = 3;
+            } else {
+                return null;
+            }
+        }
+
+        try {
+            factor = dataStr.charAt(0) == '-' ? -factor : factor;
+            if (factor < 0) {
+                dataStr = dataStr.substring(1);
+            }
+            TemporalAccessor temporalAccessor = 
dateTimeFormatter.parse(dataStr);
+            HashMap<ChronoField, Long> map = new HashMap<>();
+            for (ChronoField field : CHRONO_FIELD_LIST) {
+                try {
+                    long num = temporalAccessor.getLong(field);
+                    if (num == 0) {
+                        continue;
+                    }
+                    map.put(field, temporalAccessor.getLong(field));
+                } catch (Exception ignored) {
+
+                }
+            }
+            return Pair.of(factor, map);
+        } catch (Exception e) {
+            log.error("Interval parse error", e);
+            return null;
+        }
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
index 140a1dc995..cf32f1694c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
@@ -20,14 +20,17 @@ package org.apache.inlong.sdk.transform.process.parser;
 import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.utils.DateUtil;
 
 import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
+import org.apache.commons.lang3.tuple.Pair;
 
 import java.math.BigDecimal;
+import java.time.temporal.ChronoField;
+import java.util.Map;
 
 /**
  * SubtractionParser
- * 
  */
 @TransformParser(values = Subtraction.class)
 public class SubtractionParser implements ValueParser {
@@ -41,16 +44,38 @@ public class SubtractionParser implements ValueParser {
         this.right = OperatorTools.buildParser(expr.getRightExpression());
     }
 
-    /**
-     * parse
-     * @param sourceData
-     * @param rowIndex
-     * @return
-     */
     @Override
     public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        if (this.left instanceof IntervalParser && this.right instanceof 
IntervalParser) {
+            return null;
+        } else if (this.left instanceof IntervalParser || this.right 
instanceof IntervalParser) {
+            IntervalParser intervalParser = null;
+            ValueParser dateParser = null;
+            if (this.left instanceof IntervalParser) {
+                intervalParser = (IntervalParser) this.left;
+                dateParser = this.right;
+            } else {
+                intervalParser = (IntervalParser) this.right;
+                dateParser = this.left;
+            }
+            Object intervalPairObj = intervalParser.parse(sourceData, 
rowIndex, context);
+            Object dateObj = dateParser.parse(sourceData, rowIndex, context);
+            if (intervalPairObj == null || dateObj == null) {
+                return null;
+            }
+            return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
+                    (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, 
-1);
+        } else {
+            return numericalOperation(sourceData, rowIndex, context);
+        }
+    }
+
+    private BigDecimal numericalOperation(SourceData sourceData, int rowIndex, 
Context context) {
         Object leftObj = this.left.parse(sourceData, rowIndex, context);
         Object rightObj = this.right.parse(sourceData, rowIndex, context);
+        if (leftObj == null || rightObj == null) {
+            return null;
+        }
         BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
         BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
         return leftValue.subtract(rightValue);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
new file mode 100644
index 0000000000..998f838cf2
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.utils;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoField;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class DateUtil {
+
+    // Need to follow this order
+    private static final List<DateTimeFormatter> DATE_TIME_FORMATTER_LIST = 
Arrays.asList(
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"),
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
+            DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+    private static final List<DateTimeFormatter> TIME_FORMATTER_LIST = 
Arrays.asList(
+            DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"), 
DateTimeFormatter.ofPattern("HH:mm:ss"));
+
+    /**
+     * Time calculation
+     *
+     * @param dateStr      Time parameter string
+     * @param intervalPair Interval parsing results
+     * @param sign         If the sign is positive or negative, it indicates 
addition or subtraction
+     * @return Calculation result string
+     */
+    public static String dateAdd(String dateStr, Pair<Integer, 
Map<ChronoField, Long>> intervalPair, int sign) {
+
+        if (sign < 0) {
+            sign = -1;
+        } else if (sign > 0) {
+            sign = 1;
+        } else {
+            return null;
+        }
+
+        Object dateParserObj = null;
+        for (DateTimeFormatter dateTimeFormatter : DATE_TIME_FORMATTER_LIST) {
+            try {
+                dateParserObj = LocalDateTime.parse(dateStr, 
dateTimeFormatter);
+            } catch (Exception e) {
+                try {
+                    dateParserObj = LocalDate.parse(dateStr, 
dateTimeFormatter).atStartOfDay();
+                } catch (Exception ignored) {
+
+                }
+            }
+            if (dateParserObj != null) {
+                return addDateTime(intervalPair, sign, (LocalDateTime) 
dateParserObj, dateStr);
+            }
+        }
+
+        for (DateTimeFormatter dateTimeFormatter : TIME_FORMATTER_LIST) {
+            try {
+                dateParserObj = LocalTime.parse(dateStr, dateTimeFormatter);
+            } catch (Exception ignored) {
+
+            }
+            if (dateParserObj != null) {
+                return addTime(intervalPair, sign, (LocalTime) dateParserObj, 
dateStr);
+            }
+        }
+
+        return null;
+    }
+
+    private static String addDateTime(Pair<Integer, Map<ChronoField, Long>> 
intervalPair, int sign,
+            LocalDateTime dateTime, String dataStr) {
+        int factor = intervalPair.getKey();
+        Map<ChronoField, Long> valueMap = intervalPair.getValue();
+
+        boolean hasTime = dataStr.indexOf(' ') != -1;
+        boolean hasMicroSecond = dataStr.indexOf('.') != -1;
+
+        for (ChronoField field : valueMap.keySet()) {
+            long amount = valueMap.get(field) * factor * sign;
+            switch (field) {
+                case MICRO_OF_SECOND:
+                    hasTime = true;
+                    hasMicroSecond = true;
+                    dateTime = dateTime.plusNanos(amount * 1000L);
+                    break;
+                case SECOND_OF_MINUTE:
+                    hasTime = true;
+                    dateTime = dateTime.plusSeconds(amount);
+                    break;
+                case MINUTE_OF_HOUR:
+                    hasTime = true;
+                    dateTime = dateTime.plusMinutes(amount);
+                    break;
+                case HOUR_OF_DAY:
+                    hasTime = true;
+                    dateTime = dateTime.plusHours(amount);
+                    break;
+                case DAY_OF_MONTH:
+                    dateTime = dateTime.plusDays(amount);
+                    break;
+                case MONTH_OF_YEAR:
+                    dateTime = dateTime.plusMonths(amount);
+                    break;
+                case YEAR:
+                    dateTime = dateTime.plusYears(amount);
+                    break;
+                default:
+                    return null;
+            }
+        }
+
+        String result = dateTime.toLocalDate().toString();
+        if (hasTime) {
+            if (hasMicroSecond) {
+                result += " " + 
dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(0));
+            } else {
+                result += " " + 
dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(1));
+            }
+        }
+        return result;
+    }
+
+    private static String addTime(Pair<Integer, Map<ChronoField, Long>> 
intervalPair, int sign, LocalTime time,
+            String dataStr) {
+        int factor = intervalPair.getKey();
+        Map<ChronoField, Long> valueMap = intervalPair.getValue();
+
+        boolean hasMicroSecond = dataStr.indexOf('.') != -1;
+
+        for (ChronoField field : valueMap.keySet()) {
+            long amount = valueMap.get(field) * factor * sign;
+            switch (field) {
+                case MICRO_OF_SECOND:
+                    hasMicroSecond = true;
+                    time = time.plusNanos(amount * 1000L);
+                    break;
+                case SECOND_OF_MINUTE:
+                    time = time.plusSeconds(amount);
+                    break;
+                case MINUTE_OF_HOUR:
+                    time = time.plusMinutes(amount);
+                    break;
+                case HOUR_OF_DAY:
+                    time = time.plusHours(amount);
+                    break;
+                default:
+                    return null;
+            }
+        }
+
+        if (hasMicroSecond) {
+            return time.format(TIME_FORMATTER_LIST.get(0));
+        } else {
+            return time.format(TIME_FORMATTER_LIST.get(1));
+        }
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java
index 2534bff096..4fcb436575 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java
@@ -41,6 +41,11 @@ public abstract class AbstractParserTestBase {
             field.setName("numeric" + i);
             srcFields.add(field);
         }
+        for (int i = 1; i < 5; i++) {
+            FieldInfo field = new FieldInfo();
+            field.setName("string" + i);
+            srcFields.add(field);
+        }
         FieldInfo field = new FieldInfo();
         field.setName("result");
         dstFields.add(field);
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestAdditionParser.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestAdditionParser.java
new file mode 100644
index 0000000000..fb27cd4e34
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestAdditionParser.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestAdditionParser extends AbstractParserTestBase {
+
+    @Test
+    public void testAdditionParser() throws Exception {
+        String transformSql = null;
+        TransformConfig config = null;
+        TransformProcessor<String, String> processor = null;
+        List<String> output = null;
+
+        transformSql = "select string1 + INTERVAL string2 SECOND_MICROSECOND 
from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case1: '1992-12-31 23:59:59' + INTERVAL 1.999999 SECOND_MICROSECOND
+        output = processor.transform("||||1992-12-31 23:59:59|1.999999", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1993-01-01 00:00:00.999999", 
output.get(0));
+        // case1: '1992-12-31 23:59:59' + INTERVAL -1.999999 SECOND_MICROSECOND
+        output = processor.transform("||||1992-12-31 23:59:59|-1.999999", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1992-12-31 23:59:57.000001", 
output.get(0));
+
+        // case2: '1992-12-31' + INTERVAL 1.999999 SECOND_MICROSECOND
+        output = processor.transform("||||1992-12-31|1.999999", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1992-12-31 00:00:01.999999", 
output.get(0));
+        // case2: '1992-12-31' + INTERVAL -1.999999 SECOND_MICROSECOND
+        output = processor.transform("||||1992-12-31|-1.999999", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1992-12-30 23:59:58.000001", 
output.get(0));
+
+        transformSql = "select string1 + INTERVAL string2 YEAR from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case3: '1992-12-31 23:59:59' + INTERVAL 1 YEAR
+        output = processor.transform("||||1992-12-31 23:59:59|1", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1993-12-31 23:59:59", output.get(0));
+        // case3: '1992-12-31 23:59:59' + INTERVAL -1 YEAR
+        output = processor.transform("||||1992-12-31 23:59:59|-1", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1991-12-31 23:59:59", output.get(0));
+
+        // case4: '23:59:59' + INTERVAL 1 YEAR
+        output = processor.transform("||||23:59:59|1", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=null", output.get(0));
+        // case4: '23:59:59' + INTERVAL -1 YEAR
+        output = processor.transform("||||23:59:59|-1", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=null", output.get(0));
+
+        transformSql = "select string1 + INTERVAL string2 WEEK from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case5: '1992-12-31 23:59:59' + INTERVAL 13 WEEK
+        output = processor.transform("||||1992-12-31 23:59:59|13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1993-04-01 23:59:59", output.get(0));
+        // case5: '1992-12-31 23:59:59' + INTERVAL -13 WEEK
+        output = processor.transform("||||1992-12-31 23:59:59|-13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1992-10-01 23:59:59", output.get(0));
+
+        transformSql = "select string1 + INTERVAL string2 QUARTER from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case6: '1992-12-31 23:59:59' + INTERVAL 13 QUARTER
+        output = processor.transform("||||1992-12-31 23:59:59|13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1996-03-31 23:59:59", output.get(0));
+        // case6: '1992-12-31 23:59:59' + INTERVAL -13 QUARTER
+        output = processor.transform("||||1992-12-31 23:59:59|-13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1989-09-30 23:59:59", output.get(0));
+
+        transformSql = "select string1 + INTERVAL xxd QUARTER from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case7: '1992-12-31 23:59:59' + INTERVAL null QUARTER
+        output = processor.transform("||||1992-12-31 23:59:59|13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=null", output.get(0));
+
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestSubtractionParser.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestSubtractionParser.java
new file mode 100644
index 0000000000..b7bd4cfb8a
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestSubtractionParser.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestSubtractionParser extends AbstractParserTestBase {
+
+    @Test
+    public void testSubtractionParser() throws Exception {
+        String transformSql = null;
+        TransformConfig config = null;
+        TransformProcessor<String, String> processor = null;
+        List<String> output = null;
+
+        transformSql = "select string1 - INTERVAL string2 SECOND_MICROSECOND 
from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case1: '1992-12-31 23:59:59' - INTERVAL 1.999999 SECOND_MICROSECOND
+        output = processor.transform("||||1992-12-31 23:59:59|1.999999", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1992-12-31 23:59:57.000001", 
output.get(0));
+        // case1: '1992-12-31 23:59:59' - INTERVAL -1.999999 SECOND_MICROSECOND
+        output = processor.transform("||||1992-12-31 23:59:59|-1.999999", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1993-01-01 00:00:00.999999", 
output.get(0));
+
+        // case2: '1992-12-31' - INTERVAL 1.999999 SECOND_MICROSECOND
+        output = processor.transform("||||1992-12-31|1.999999", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1992-12-30 23:59:58.000001", 
output.get(0));
+        // case2: '1992-12-31' - INTERVAL -1.999999 SECOND_MICROSECOND
+        output = processor.transform("||||1992-12-31|-1.999999", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1992-12-31 00:00:01.999999", 
output.get(0));
+
+        transformSql = "select string1 - INTERVAL string2 YEAR from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case3: '1992-12-31 23:59:59' - INTERVAL 1 YEAR
+        output = processor.transform("||||1992-12-31 23:59:59|1", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1991-12-31 23:59:59", output.get(0));
+        // case3: '1992-12-31 23:59:59' - INTERVAL -1 YEAR
+        output = processor.transform("||||1992-12-31 23:59:59|-1", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1993-12-31 23:59:59", output.get(0));
+
+        // case4: '23:59:59' - INTERVAL 1 YEAR
+        output = processor.transform("||||23:59:59|1", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=null", output.get(0));
+        // case4: '23:59:59' - INTERVAL -1 YEAR
+        output = processor.transform("||||23:59:59|-1", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=null", output.get(0));
+
+        transformSql = "select string1 - INTERVAL string2 WEEK from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case5: '1992-12-31 23:59:59' - INTERVAL 13 WEEK
+        output = processor.transform("||||1992-12-31 23:59:59|13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1992-10-01 23:59:59", output.get(0));
+        // case5: '1992-12-31 23:59:59' - INTERVAL -13 WEEK
+        output = processor.transform("||||1992-12-31 23:59:59|-13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1993-04-01 23:59:59", output.get(0));
+
+        transformSql = "select string1 - INTERVAL string2 QUARTER from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case6: '1992-12-31 23:59:59' - INTERVAL 13 QUARTER
+        output = processor.transform("||||1992-12-31 23:59:59|13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1989-09-30 23:59:59", output.get(0));
+        // case6: '1992-12-31 23:59:59' - INTERVAL -13 QUARTER
+        output = processor.transform("||||1992-12-31 23:59:59|-13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1996-03-31 23:59:59", output.get(0));
+
+        transformSql = "select string1 - INTERVAL xxd QUARTER from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case7: '1992-12-31 23:59:59' - INTERVAL null QUARTER
+        output = processor.transform("||||1992-12-31 23:59:59|13", new 
HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=null", output.get(0));
+
+    }
+}


Reply via email to