This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e7878063ff [INLONG-10897][SDK] Transform support DATEDIFF function
(#10925)
e7878063ff is described below
commit e7878063ff0d90fd2f1a1992e738c19256123387
Author: Zkplo <[email protected]>
AuthorDate: Mon Sep 2 09:43:14 2024 +0800
[INLONG-10897][SDK] Transform support DATEDIFF function (#10925)
Co-authored-by: ZKpLo <[email protected]>
---
.../process/function/DateDiffFunction.java | 89 ++++++++++++++++++++++
.../TestTransformTemporalFunctionsProcessor.java | 48 ++++++++++++
2 files changed, 137 insertions(+)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java
new file mode 100644
index 0000000000..77b2c93dfc
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateDiffFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+
+/**
+ * DateDiffFunction
+ * description: DATEDIFF(d1, d2)
+ * - return null if one of the two parameters is null or ""
+ * - return null if one of the two parameters has an incorrect date format
+ * - return the number of days between the dates d1->d2.
+ */
+@TransformFunction(names = {"datediff", "date_diff"})
+public class DateDiffFunction implements ValueParser {
+
+ private final ValueParser leftDateParser;
+ private final ValueParser rightDateParser;
+ private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private static final DateTimeFormatter DEFAULT_FORMAT_DATE =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+ public DateDiffFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ leftDateParser = OperatorTools.buildParser(expressions.get(0));
+ rightDateParser = OperatorTools.buildParser(expressions.get(1));
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ Object leftDateObj = leftDateParser.parse(sourceData, rowIndex,
context);
+ Object rightDateObj = rightDateParser.parse(sourceData, rowIndex,
context);
+ if (leftDateObj == null || rightDateObj == null) {
+ return null;
+ }
+ String leftDate = OperatorTools.parseString(leftDateObj);
+ String rightDate = OperatorTools.parseString(rightDateObj);
+ if (leftDate.isEmpty() || rightDate.isEmpty()) {
+ return null;
+ }
+ try {
+ LocalDate left = getLocalDate(leftDate);
+ LocalDate right = getLocalDate(rightDate);
+ return ChronoUnit.DAYS.between(right, left);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ public LocalDate getLocalDate(String dateString) {
+ DateTimeFormatter formatter = null;
+ LocalDate dateTime = null;
+ if (dateString.indexOf(' ') != -1) {
+ formatter = DEFAULT_FORMAT_DATE_TIME;
+ dateTime = LocalDateTime.parse(dateString,
formatter).toLocalDate();
+ } else {
+ formatter = DEFAULT_FORMAT_DATE;
+ dateTime = LocalDate.parse(dateString, formatter);
+ }
+ return dateTime;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
index 354b8b7f18..f2c73ee3d9 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java
@@ -379,6 +379,54 @@ public class TestTransformTemporalFunctionsProcessor {
Assert.assertEquals(output4.get(0), "result=1970-01-01 00:00:00.0");
}
+ @Test
+ public void testDateDiffFunction() throws Exception {
+ String transformSql = null;
+ TransformConfig config = null;
+ TransformProcessor<String, String> processor = null;
+ List<String> output = null;
+
+ transformSql = "select datediff(string1,string2) from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: datediff('1970-01-01','1970-01-02')
+ output = processor.transform("1970-01-01|1970-01-02", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=-1", output.get(0));
+
+ // case2: datediff('1970-01-02','1970-01-01')
+ output = processor.transform("1970-01-02|1970-01-01", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1", output.get(0));
+
+ // case3: datediff('2018-12-10 12:30:00', '2018-12-09 13:30:00')
+ output = processor.transform("2018-12-10 12:30:00|2018-12-09
13:30:00", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=1", output.get(0));
+
+ // case4: datediff('2018-12-10 12:30:00', '')
+ output = processor.transform("2018-12-10 12:30:00|", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=null", output.get(0));
+
+ // case5: datediff('2018-12', '2018-12-12')
+ output = processor.transform("2018-12|2018-12-12", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=null", output.get(0));
+
+ // case6: datediff('1970-01-01',null)
+ transformSql = "select datediff(string1,xxd) from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ output = processor.transform("1970-01-01|1970-01-02", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=null", output.get(0));
+ }
+
@Test
public void testLocalTimeFunction() throws Exception {
String transformSql1 = "select localtime() from source";