This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e7878063ff [INLONG-10897][SDK] Transform support DATEDIFF function 
(#10925)
e7878063ff is described below

commit e7878063ff0d90fd2f1a1992e738c19256123387
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Mon Sep 2 09:43:14 2024 +0800

    [INLONG-10897][SDK] Transform support DATEDIFF function (#10925)
    
    Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com>
---
 .../process/function/DateDiffFunction.java         | 89 ++++++++++++++++++++++
 .../TestTransformTemporalFunctionsProcessor.java   | 48 ++++++++++++
 2 files changed, 137 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java
new file mode 100644
index 0000000000..77b2c93dfc
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+
+/**
+ * DateDiffFunction
+ * description: DATEDIFF(d1, d2)
+ * - return null if one of the two parameters is null or ""
+ * - return null if one of the two parameters has an incorrect date format
+ * - return the number of days between the dates d1->d2.
+ */
+@TransformFunction(names = {"datediff", "date_diff"})
+public class DateDiffFunction implements ValueParser {
+
+    private final ValueParser leftDateParser;
+    private final ValueParser rightDateParser;
+    private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    private static final DateTimeFormatter DEFAULT_FORMAT_DATE = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+    public DateDiffFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        leftDateParser = OperatorTools.buildParser(expressions.get(0));
+        rightDateParser = OperatorTools.buildParser(expressions.get(1));
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object leftDateObj = leftDateParser.parse(sourceData, rowIndex, 
context);
+        Object rightDateObj = rightDateParser.parse(sourceData, rowIndex, 
context);
+        if (leftDateObj == null || rightDateObj == null) {
+            return null;
+        }
+        String leftDate = OperatorTools.parseString(leftDateObj);
+        String rightDate = OperatorTools.parseString(rightDateObj);
+        if (leftDate.isEmpty() || rightDate.isEmpty()) {
+            return null;
+        }
+        try {
+            LocalDate left = getLocalDate(leftDate);
+            LocalDate right = getLocalDate(rightDate);
+            return ChronoUnit.DAYS.between(right, left);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    public LocalDate getLocalDate(String dateString) {
+        DateTimeFormatter formatter = null;
+        LocalDate dateTime = null;
+        if (dateString.indexOf(' ') != -1) {
+            formatter = DEFAULT_FORMAT_DATE_TIME;
+            dateTime = LocalDateTime.parse(dateString, 
formatter).toLocalDate();
+        } else {
+            formatter = DEFAULT_FORMAT_DATE;
+            dateTime = LocalDate.parse(dateString, formatter);
+        }
+        return dateTime;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
index 354b8b7f18..f2c73ee3d9 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
@@ -379,6 +379,54 @@ public class TestTransformTemporalFunctionsProcessor {
         Assert.assertEquals(output4.get(0), "result=1970-01-01 00:00:00.0");
     }
 
+    @Test
+    public void testDateDiffFunction() throws Exception {
+        String transformSql = null;
+        TransformConfig config = null;
+        TransformProcessor<String, String> processor = null;
+        List<String> output = null;
+
+        transformSql = "select datediff(string1,string2) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        // case1: datediff('1970-01-01','1970-01-02')
+        output = processor.transform("1970-01-01|1970-01-02", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=-1", output.get(0));
+
+        // case2: datediff('1970-01-02','1970-01-01')
+        output = processor.transform("1970-01-02|1970-01-01", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1", output.get(0));
+
+        // case3: datediff('2018-12-10 12:30:00', '2018-12-09 13:30:00')
+        output = processor.transform("2018-12-10 12:30:00|2018-12-09 
13:30:00", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=1", output.get(0));
+
+        // case4: datediff('2018-12-10 12:30:00', '')
+        output = processor.transform("2018-12-10 12:30:00|", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=null", output.get(0));
+
+        // case5: datediff('2018-12', '2018-12-12')
+        output = processor.transform("2018-12|2018-12-12", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=null", output.get(0));
+
+        // case6: datediff('1970-01-01',null)
+        transformSql = "select datediff(string1,xxd) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        output = processor.transform("1970-01-01|1970-01-02", new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=null", output.get(0));
+    }
+
     @Test
     public void testLocalTimeFunction() throws Exception {
         String transformSql1 = "select localtime() from source";

Reply via email to