This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e7878063ff [INLONG-10897][SDK] Transform support DATEDIFF function (#10925) e7878063ff is described below commit e7878063ff0d90fd2f1a1992e738c19256123387 Author: Zkplo <87751516+zk...@users.noreply.github.com> AuthorDate: Mon Sep 2 09:43:14 2024 +0800 [INLONG-10897][SDK] Transform support DATEDIFF function (#10925) Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com> --- .../process/function/DateDiffFunction.java | 89 ++++++++++++++++++++++ .../TestTransformTemporalFunctionsProcessor.java | 48 ++++++++++++ 2 files changed, 137 insertions(+) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java new file mode 100644 index 0000000000..77b2c93dfc --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.List; + +/** + * DateDiffFunction + * description: DATEDIFF(d1, d2) + * - return null if one of the two parameters is null or "" + * - return null if one of the two parameters has an incorrect date format + * - return the number of days between the dates d1->d2. + */ +@TransformFunction(names = {"datediff", "date_diff"}) +public class DateDiffFunction implements ValueParser { + + private final ValueParser leftDateParser; + private final ValueParser rightDateParser; + private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + public DateDiffFunction(Function expr) { + List<Expression> expressions = expr.getParameters().getExpressions(); + leftDateParser = OperatorTools.buildParser(expressions.get(0)); + rightDateParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftDateObj = leftDateParser.parse(sourceData, rowIndex, context); + Object rightDateObj = rightDateParser.parse(sourceData, rowIndex, context); + if (leftDateObj == null || rightDateObj == null) { + return null; + } + String leftDate = OperatorTools.parseString(leftDateObj); + String rightDate = OperatorTools.parseString(rightDateObj); + if (leftDate.isEmpty() || rightDate.isEmpty()) { + return null; + } + try { + LocalDate left = getLocalDate(leftDate); + LocalDate right = getLocalDate(rightDate); + return ChronoUnit.DAYS.between(right, left); + } catch (Exception e) { + return null; + } + } + + public LocalDate getLocalDate(String dateString) { + DateTimeFormatter formatter = null; + LocalDate dateTime = null; + if (dateString.indexOf(' ') != -1) { + formatter = DEFAULT_FORMAT_DATE_TIME; + dateTime = LocalDateTime.parse(dateString, formatter).toLocalDate(); + } else { + formatter = DEFAULT_FORMAT_DATE; + dateTime = LocalDate.parse(dateString, formatter); + } + return dateTime; + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java index 354b8b7f18..f2c73ee3d9 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java @@ -379,6 +379,54 @@ public class TestTransformTemporalFunctionsProcessor { Assert.assertEquals(output4.get(0), "result=1970-01-01 00:00:00.0"); } + @Test + public void testDateDiffFunction() throws Exception { + String transformSql = null; + TransformConfig config = null; + TransformProcessor<String, String> processor = null; + List<String> output = null; + + transformSql = "select datediff(string1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: datediff('1970-01-01','1970-01-02') + output = processor.transform("1970-01-01|1970-01-02", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=-1", output.get(0)); + + // case2: datediff('1970-01-02','1970-01-01') + output = processor.transform("1970-01-02|1970-01-01", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1", output.get(0)); + + // case3: datediff('2018-12-10 12:30:00', '2018-12-09 13:30:00') + output = processor.transform("2018-12-10 12:30:00|2018-12-09 13:30:00", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1", output.get(0)); + + // case4: datediff('2018-12-10 12:30:00', '') + output = processor.transform("2018-12-10 12:30:00|", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case5: datediff('2018-12', '2018-12-12') + output = processor.transform("2018-12|2018-12-12", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case6: datediff('1970-01-01',null) + transformSql = "select datediff(string1,xxd) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + output = processor.transform("1970-01-01|1970-01-02", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + } + @Test public void testLocalTimeFunction() throws Exception { String transformSql1 = "select localtime() from source";