This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d4df5eda31 [INLONG-11081][SDK] Transform SQL supports INTERVAL parse (#11086) d4df5eda31 is described below commit d4df5eda31db2431da8afe391bc2f5b7ccc4418e Author: Zkplo <87751516+zk...@users.noreply.github.com> AuthorDate: Fri Sep 20 09:57:29 2024 +0800 [INLONG-11081][SDK] Transform SQL supports INTERVAL parse (#11086) Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com> --- .../transform/process/parser/AdditionParser.java | 39 ++++- .../transform/process/parser/IntervalParser.java | 154 ++++++++++++++++++ .../process/parser/SubtractionParser.java | 39 ++++- .../sdk/transform/process/utils/DateUtil.java | 177 +++++++++++++++++++++ .../process/parser/AbstractParserTestBase.java | 5 + .../process/parser/TestAdditionParser.java | 125 +++++++++++++++ .../process/parser/TestSubtractionParser.java | 125 +++++++++++++++ 7 files changed, 650 insertions(+), 14 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java index 13536be30d..eea6ce7e17 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java @@ -20,14 +20,17 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.utils.DateUtil; import net.sf.jsqlparser.expression.operators.arithmetic.Addition; +import org.apache.commons.lang3.tuple.Pair; import java.math.BigDecimal; +import java.time.temporal.ChronoField; +import java.util.Map; /** * AdditionParser - * */ @TransformParser(values = Addition.class) public class AdditionParser implements ValueParser { @@ -41,16 +44,38 @@ public class AdditionParser implements ValueParser { this.right = OperatorTools.buildParser(expr.getRightExpression()); } - /** - * parse - * @param sourceData - * @param rowIndex - * @return - */ @Override public Object parse(SourceData sourceData, int rowIndex, Context context) { + if (this.left instanceof IntervalParser && this.right instanceof IntervalParser) { + return null; + } else if (this.left instanceof IntervalParser || this.right instanceof IntervalParser) { + IntervalParser intervalParser = null; + ValueParser dateParser = null; + if (this.left instanceof IntervalParser) { + intervalParser = (IntervalParser) this.left; + dateParser = this.right; + } else { + intervalParser = (IntervalParser) this.right; + dateParser = this.left; + } + Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context); + Object dateObj = dateParser.parse(sourceData, rowIndex, context); + if (intervalPairObj == null || dateObj == null) { + return null; + } + return DateUtil.dateAdd(OperatorTools.parseString(dateObj), + (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, 1); + } else { + return numericalOperation(sourceData, rowIndex, context); + } + } + + private BigDecimal numericalOperation(SourceData sourceData, int rowIndex, Context context) { Object leftObj = this.left.parse(sourceData, rowIndex, context); Object rightObj = this.right.parse(sourceData, rowIndex, context); + if (leftObj == null || rightObj == null) { + return null; + } BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); return leftValue.add(rightValue); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java new file mode 100644 index 0000000000..7266dcd63b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/IntervalParser.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.IntervalExpression; +import org.apache.commons.lang3.tuple.Pair; + +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoField; +import java.time.temporal.TemporalAccessor; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * IntervalParser <-> INTERVAL expr unit -> Pair(factor,Map(ChronoField,Count)): + * <p> + * `factor`: + * <p> + * 1) `expr` can accept strings starting with '-', representing the meaning of subtraction. + * So the positive or negative sign of `factor` indicates whether `expr` starts with a '-' or not. + * <p> + * 2) For units like WEEK and QUARTER, it is not easy to parse, + * so WEEK -> ( unit=DAY, adb(factor)=7 ); QUARTER -> ( unit=MONTH, adb(factor)=3 ). + * <p> + * `Map(ChronoField,Count)`: + * <p> + * IntervalParser will automatically match the corresponding DateTimeFormatter based on the input `expr`, + * Based on DateTimeFormatter, IntervalParser will parse the incoming units and store them in a Map. + * <p> + * + * In addition,acceptable expression parsing and specifying parameters in two ways: + * 1) interval rowName year -> expression + * 3) interval 2 year -> fixed parameter + */ +@Slf4j +@TransformParser(values = IntervalExpression.class) +public class IntervalParser implements ValueParser { + + private final String intervalType; + private final ValueParser dateParser; + private final String parameter; + + private static final List<ChronoField> CHRONO_FIELD_LIST = Arrays.asList(ChronoField.YEAR, + ChronoField.MONTH_OF_YEAR, + ChronoField.DAY_OF_MONTH, ChronoField.HOUR_OF_DAY, ChronoField.MINUTE_OF_HOUR, ChronoField.SECOND_OF_MINUTE, + ChronoField.MICRO_OF_SECOND); + private static final Map<String, DateTimeFormatter> DT_FORMATTER_MAP = new ConcurrentHashMap<>(); + + static { + DT_FORMATTER_MAP.put("SECOND_MICROSECOND", DateTimeFormatter.ofPattern("s.SSSSSS")); + DT_FORMATTER_MAP.put("MINUTE_MICROSECOND", DateTimeFormatter.ofPattern("m:s.SSSSSS")); + DT_FORMATTER_MAP.put("MINUTE_SECOND", DateTimeFormatter.ofPattern("m:s")); + DT_FORMATTER_MAP.put("HOUR_MICROSECOND", DateTimeFormatter.ofPattern("H:m:s.SSSSSS")); + DT_FORMATTER_MAP.put("HOUR_SECOND", DateTimeFormatter.ofPattern("H:m:s")); + DT_FORMATTER_MAP.put("HOUR_MINUTE", DateTimeFormatter.ofPattern("H:m")); + DT_FORMATTER_MAP.put("DAY_MICROSECOND", DateTimeFormatter.ofPattern("d H:m:s.SSSSSS")); + DT_FORMATTER_MAP.put("DAY_SECOND", DateTimeFormatter.ofPattern("d H:m:s")); + DT_FORMATTER_MAP.put("DAY_MINUTE", DateTimeFormatter.ofPattern("d H:m")); + DT_FORMATTER_MAP.put("DAY_HOUR", DateTimeFormatter.ofPattern("d H")); + DT_FORMATTER_MAP.put("YEAR_MONTH", DateTimeFormatter.ofPattern("y-M")); + + DT_FORMATTER_MAP.put("MICROSECOND", DateTimeFormatter.ofPattern("SSSSSS")); + DT_FORMATTER_MAP.put("SECOND", DateTimeFormatter.ofPattern("s")); + DT_FORMATTER_MAP.put("MINUTE", DateTimeFormatter.ofPattern("m")); + DT_FORMATTER_MAP.put("HOUR", DateTimeFormatter.ofPattern("H")); + DT_FORMATTER_MAP.put("DAY", DateTimeFormatter.ofPattern("d")); + DT_FORMATTER_MAP.put("MONTH", DateTimeFormatter.ofPattern("M")); + DT_FORMATTER_MAP.put("YEAR", DateTimeFormatter.ofPattern("y")); + } + + public IntervalParser(IntervalExpression expr) { + intervalType = expr.getIntervalType().toUpperCase(); + dateParser = OperatorTools.buildParser(expr.getExpression()); + if (dateParser == null) { + parameter = expr.getParameter(); + } else { + parameter = null; + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + DateTimeFormatter dateTimeFormatter = DT_FORMATTER_MAP.get(intervalType); + + String dataStr = parameter; + if (dateParser != null) { + Object dateObj = dateParser.parse(sourceData, rowIndex, context); + if (dateObj == null) { + return null; + } + dataStr = OperatorTools.parseString(dateObj); + } + + int factor = 1; + if (dateTimeFormatter == null) { + if ("WEEK".equals(intervalType)) { + dateTimeFormatter = DT_FORMATTER_MAP.get("DAY"); + factor = 7; + } else if ("QUARTER".equals(intervalType)) { + dateTimeFormatter = DT_FORMATTER_MAP.get("MONTH"); + factor = 3; + } else { + return null; + } + } + + try { + factor = dataStr.charAt(0) == '-' ? -factor : factor; + if (factor < 0) { + dataStr = dataStr.substring(1); + } + TemporalAccessor temporalAccessor = dateTimeFormatter.parse(dataStr); + HashMap<ChronoField, Long> map = new HashMap<>(); + for (ChronoField field : CHRONO_FIELD_LIST) { + try { + long num = temporalAccessor.getLong(field); + if (num == 0) { + continue; + } + map.put(field, temporalAccessor.getLong(field)); + } catch (Exception ignored) { + + } + } + return Pair.of(factor, map); + } catch (Exception e) { + log.error("Interval parse error", e); + return null; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java index 140a1dc995..cf32f1694c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java @@ -20,14 +20,17 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.utils.DateUtil; import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction; +import org.apache.commons.lang3.tuple.Pair; import java.math.BigDecimal; +import java.time.temporal.ChronoField; +import java.util.Map; /** * SubtractionParser - * */ @TransformParser(values = Subtraction.class) public class SubtractionParser implements ValueParser { @@ -41,16 +44,38 @@ public class SubtractionParser implements ValueParser { this.right = OperatorTools.buildParser(expr.getRightExpression()); } - /** - * parse - * @param sourceData - * @param rowIndex - * @return - */ @Override public Object parse(SourceData sourceData, int rowIndex, Context context) { + if (this.left instanceof IntervalParser && this.right instanceof IntervalParser) { + return null; + } else if (this.left instanceof IntervalParser || this.right instanceof IntervalParser) { + IntervalParser intervalParser = null; + ValueParser dateParser = null; + if (this.left instanceof IntervalParser) { + intervalParser = (IntervalParser) this.left; + dateParser = this.right; + } else { + intervalParser = (IntervalParser) this.right; + dateParser = this.left; + } + Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context); + Object dateObj = dateParser.parse(sourceData, rowIndex, context); + if (intervalPairObj == null || dateObj == null) { + return null; + } + return DateUtil.dateAdd(OperatorTools.parseString(dateObj), + (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, -1); + } else { + return numericalOperation(sourceData, rowIndex, context); + } + } + + private BigDecimal numericalOperation(SourceData sourceData, int rowIndex, Context context) { Object leftObj = this.left.parse(sourceData, rowIndex, context); Object rightObj = this.right.parse(sourceData, rowIndex, context); + if (leftObj == null || rightObj == null) { + return null; + } BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); return leftValue.subtract(rightValue); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java new file mode 100644 index 0000000000..998f838cf2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/utils/DateUtil.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.utils; + +import org.apache.commons.lang3.tuple.Pair; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoField; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class DateUtil { + + // Need to follow this order + private static final List<DateTimeFormatter> DATE_TIME_FORMATTER_LIST = Arrays.asList( + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"), + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"), + DateTimeFormatter.ofPattern("yyyy-MM-dd")); + private static final List<DateTimeFormatter> TIME_FORMATTER_LIST = Arrays.asList( + DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"), DateTimeFormatter.ofPattern("HH:mm:ss")); + + /** + * Time calculation + * + * @param dateStr Time parameter string + * @param intervalPair Interval parsing results + * @param sign If the sign is positive or negative, it indicates addition or subtraction + * @return Calculation result string + */ + public static String dateAdd(String dateStr, Pair<Integer, Map<ChronoField, Long>> intervalPair, int sign) { + + if (sign < 0) { + sign = -1; + } else if (sign > 0) { + sign = 1; + } else { + return null; + } + + Object dateParserObj = null; + for (DateTimeFormatter dateTimeFormatter : DATE_TIME_FORMATTER_LIST) { + try { + dateParserObj = LocalDateTime.parse(dateStr, dateTimeFormatter); + } catch (Exception e) { + try { + dateParserObj = LocalDate.parse(dateStr, dateTimeFormatter).atStartOfDay(); + } catch (Exception ignored) { + + } + } + if (dateParserObj != null) { + return addDateTime(intervalPair, sign, (LocalDateTime) dateParserObj, dateStr); + } + } + + for (DateTimeFormatter dateTimeFormatter : TIME_FORMATTER_LIST) { + try { + dateParserObj = LocalTime.parse(dateStr, dateTimeFormatter); + } catch (Exception ignored) { + + } + if (dateParserObj != null) { + return addTime(intervalPair, sign, (LocalTime) dateParserObj, dateStr); + } + } + + return null; + } + + private static String addDateTime(Pair<Integer, Map<ChronoField, Long>> intervalPair, int sign, + LocalDateTime dateTime, String dataStr) { + int factor = intervalPair.getKey(); + Map<ChronoField, Long> valueMap = intervalPair.getValue(); + + boolean hasTime = dataStr.indexOf(' ') != -1; + boolean hasMicroSecond = dataStr.indexOf('.') != -1; + + for (ChronoField field : valueMap.keySet()) { + long amount = valueMap.get(field) * factor * sign; + switch (field) { + case MICRO_OF_SECOND: + hasTime = true; + hasMicroSecond = true; + dateTime = dateTime.plusNanos(amount * 1000L); + break; + case SECOND_OF_MINUTE: + hasTime = true; + dateTime = dateTime.plusSeconds(amount); + break; + case MINUTE_OF_HOUR: + hasTime = true; + dateTime = dateTime.plusMinutes(amount); + break; + case HOUR_OF_DAY: + hasTime = true; + dateTime = dateTime.plusHours(amount); + break; + case DAY_OF_MONTH: + dateTime = dateTime.plusDays(amount); + break; + case MONTH_OF_YEAR: + dateTime = dateTime.plusMonths(amount); + break; + case YEAR: + dateTime = dateTime.plusYears(amount); + break; + default: + return null; + } + } + + String result = dateTime.toLocalDate().toString(); + if (hasTime) { + if (hasMicroSecond) { + result += " " + dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(0)); + } else { + result += " " + dateTime.toLocalTime().format(TIME_FORMATTER_LIST.get(1)); + } + } + return result; + } + + private static String addTime(Pair<Integer, Map<ChronoField, Long>> intervalPair, int sign, LocalTime time, + String dataStr) { + int factor = intervalPair.getKey(); + Map<ChronoField, Long> valueMap = intervalPair.getValue(); + + boolean hasMicroSecond = dataStr.indexOf('.') != -1; + + for (ChronoField field : valueMap.keySet()) { + long amount = valueMap.get(field) * factor * sign; + switch (field) { + case MICRO_OF_SECOND: + hasMicroSecond = true; + time = time.plusNanos(amount * 1000L); + break; + case SECOND_OF_MINUTE: + time = time.plusSeconds(amount); + break; + case MINUTE_OF_HOUR: + time = time.plusMinutes(amount); + break; + case HOUR_OF_DAY: + time = time.plusHours(amount); + break; + default: + return null; + } + } + + if (hasMicroSecond) { + return time.format(TIME_FORMATTER_LIST.get(0)); + } else { + return time.format(TIME_FORMATTER_LIST.get(1)); + } + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java index 2534bff096..4fcb436575 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/AbstractParserTestBase.java @@ -41,6 +41,11 @@ public abstract class AbstractParserTestBase { field.setName("numeric" + i); srcFields.add(field); } + for (int i = 1; i < 5; i++) { + FieldInfo field = new FieldInfo(); + field.setName("string" + i); + srcFields.add(field); + } FieldInfo field = new FieldInfo(); field.setName("result"); dstFields.add(field); diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestAdditionParser.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestAdditionParser.java new file mode 100644 index 0000000000..fb27cd4e34 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestAdditionParser.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestAdditionParser extends AbstractParserTestBase { + + @Test + public void testAdditionParser() throws Exception { + String transformSql = null; + TransformConfig config = null; + TransformProcessor<String, String> processor = null; + List<String> output = null; + + transformSql = "select string1 + INTERVAL string2 SECOND_MICROSECOND from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: '1992-12-31 23:59:59' + INTERVAL 1.999999 SECOND_MICROSECOND + output = processor.transform("||||1992-12-31 23:59:59|1.999999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1993-01-01 00:00:00.999999", output.get(0)); + // case1: '1992-12-31 23:59:59' + INTERVAL -1.999999 SECOND_MICROSECOND + output = processor.transform("||||1992-12-31 23:59:59|-1.999999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1992-12-31 23:59:57.000001", output.get(0)); + + // case2: '1992-12-31' + INTERVAL 1.999999 SECOND_MICROSECOND + output = processor.transform("||||1992-12-31|1.999999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1992-12-31 00:00:01.999999", output.get(0)); + // case2: '1992-12-31' + INTERVAL -1.999999 SECOND_MICROSECOND + output = processor.transform("||||1992-12-31|-1.999999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1992-12-30 23:59:58.000001", output.get(0)); + + transformSql = "select string1 + INTERVAL string2 YEAR from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: '1992-12-31 23:59:59' + INTERVAL 1 YEAR + output = processor.transform("||||1992-12-31 23:59:59|1", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1993-12-31 23:59:59", output.get(0)); + // case3: '1992-12-31 23:59:59' + INTERVAL -1 YEAR + output = processor.transform("||||1992-12-31 23:59:59|-1", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1991-12-31 23:59:59", output.get(0)); + + // case4: '23:59:59' + INTERVAL 1 YEAR + output = processor.transform("||||23:59:59|1", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + // case4: '23:59:59' + INTERVAL -1 YEAR + output = processor.transform("||||23:59:59|-1", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + transformSql = "select string1 + INTERVAL string2 WEEK from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case5: '1992-12-31 23:59:59' + INTERVAL 13 WEEK + output = processor.transform("||||1992-12-31 23:59:59|13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1993-04-01 23:59:59", output.get(0)); + // case5: '1992-12-31 23:59:59' + INTERVAL -13 WEEK + output = processor.transform("||||1992-12-31 23:59:59|-13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1992-10-01 23:59:59", output.get(0)); + + transformSql = "select string1 + INTERVAL string2 QUARTER from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case6: '1992-12-31 23:59:59' + INTERVAL 13 QUARTER + output = processor.transform("||||1992-12-31 23:59:59|13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1996-03-31 23:59:59", output.get(0)); + // case6: '1992-12-31 23:59:59' + INTERVAL -13 QUARTER + output = processor.transform("||||1992-12-31 23:59:59|-13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1989-09-30 23:59:59", output.get(0)); + + transformSql = "select string1 + INTERVAL xxd QUARTER from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case7: '1992-12-31 23:59:59' + INTERVAL null QUARTER + output = processor.transform("||||1992-12-31 23:59:59|13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestSubtractionParser.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestSubtractionParser.java new file mode 100644 index 0000000000..b7bd4cfb8a --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser/TestSubtractionParser.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestSubtractionParser extends AbstractParserTestBase { + + @Test + public void testSubtractionParser() throws Exception { + String transformSql = null; + TransformConfig config = null; + TransformProcessor<String, String> processor = null; + List<String> output = null; + + transformSql = "select string1 - INTERVAL string2 SECOND_MICROSECOND from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: '1992-12-31 23:59:59' - INTERVAL 1.999999 SECOND_MICROSECOND + output = processor.transform("||||1992-12-31 23:59:59|1.999999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1992-12-31 23:59:57.000001", output.get(0)); + // case1: '1992-12-31 23:59:59' - INTERVAL -1.999999 SECOND_MICROSECOND + output = processor.transform("||||1992-12-31 23:59:59|-1.999999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1993-01-01 00:00:00.999999", output.get(0)); + + // case2: '1992-12-31' - INTERVAL 1.999999 SECOND_MICROSECOND + output = processor.transform("||||1992-12-31|1.999999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1992-12-30 23:59:58.000001", output.get(0)); + // case2: '1992-12-31' - INTERVAL -1.999999 SECOND_MICROSECOND + output = processor.transform("||||1992-12-31|-1.999999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1992-12-31 00:00:01.999999", output.get(0)); + + transformSql = "select string1 - INTERVAL string2 YEAR from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: '1992-12-31 23:59:59' - INTERVAL 1 YEAR + output = processor.transform("||||1992-12-31 23:59:59|1", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1991-12-31 23:59:59", output.get(0)); + // case3: '1992-12-31 23:59:59' - INTERVAL -1 YEAR + output = processor.transform("||||1992-12-31 23:59:59|-1", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1993-12-31 23:59:59", output.get(0)); + + // case4: '23:59:59' - INTERVAL 1 YEAR + output = processor.transform("||||23:59:59|1", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + // case4: '23:59:59' - INTERVAL -1 YEAR + output = processor.transform("||||23:59:59|-1", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + transformSql = "select string1 - INTERVAL string2 WEEK from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case5: '1992-12-31 23:59:59' - INTERVAL 13 WEEK + output = processor.transform("||||1992-12-31 23:59:59|13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1992-10-01 23:59:59", output.get(0)); + // case5: '1992-12-31 23:59:59' - INTERVAL -13 WEEK + output = processor.transform("||||1992-12-31 23:59:59|-13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1993-04-01 23:59:59", output.get(0)); + + transformSql = "select string1 - INTERVAL string2 QUARTER from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case6: '1992-12-31 23:59:59' - INTERVAL 13 QUARTER + output = processor.transform("||||1992-12-31 23:59:59|13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1989-09-30 23:59:59", output.get(0)); + // case6: '1992-12-31 23:59:59' - INTERVAL -13 QUARTER + output = processor.transform("||||1992-12-31 23:59:59|-13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1996-03-31 23:59:59", output.get(0)); + + transformSql = "select string1 - INTERVAL xxd QUARTER from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case7: '1992-12-31 23:59:59' - INTERVAL null QUARTER + output = processor.transform("||||1992-12-31 23:59:59|13", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + } +}