This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a711a95239 [INLONG-10781][SDK] Transform SQL support temporal functions(Including form_unixtime, unix_timestamp and to_timestamp) (#10805) a711a95239 is described below commit a711a95239c8d6f360e6bdc7d4c4411bdf42977e Author: yfsn666 <61183968+yfsn...@users.noreply.github.com> AuthorDate: Mon Aug 19 10:12:22 2024 +0800 [INLONG-10781][SDK] Transform SQL support temporal functions(Including form_unixtime, unix_timestamp and to_timestamp) (#10805) --- .../process/function/FromUnixTimeFunction.java | 85 ++++++++++++++ .../process/function/ToTimestampFunction.java | 87 ++++++++++++++ .../process/function/UnixTimestampFunction.java | 79 +++++++++++++ .../transform/process/operator/OperatorTools.java | 6 + .../TestTransformTemporalFunctionsProcessor.java | 128 +++++++++++++++++++++ 5 files changed, 385 insertions(+) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromUnixTimeFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromUnixTimeFunction.java new file mode 100644 index 0000000000..d8a423ca2d --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FromUnixTimeFunction.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * FromUnixTimeFunction + * description: form_unixtime(numeric[, string])--returns a representation of the numeric argument as a value in string + * format(default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds + * since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. + */ +public class FromUnixTimeFunction implements ValueParser { + + private ValueParser numericParser; + private ValueParser stringParser; + private static final Map<String, DateTimeFormatter> OUTPUT_FORMATTERS = new ConcurrentHashMap<>(); + private static final String DEFAULT_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public FromUnixTimeFunction(Function expr) { + List<Expression> expressions = expr.getParameters().getExpressions(); + numericParser = OperatorTools.buildParser(expressions.get(0)); + // Determine the number of arguments and build parser + if (expressions.size() == 2) { + stringParser = OperatorTools.buildParser(expressions.get(1)); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numericObj = numericParser.parse(sourceData, rowIndex, context); + BigDecimal unixTimestamp = OperatorTools.parseBigDecimal(numericObj); + String formatPattern = + stringParser != null ? OperatorTools.parseString(stringParser.parse(sourceData, rowIndex, context)) + : DEFAULT_FORMAT; + + // Convert UNIX timestamp to UTC LocalDateTime + LocalDateTime utcDateTime = + LocalDateTime.ofInstant(Instant.ofEpochSecond(unixTimestamp.longValue()), ZoneOffset.UTC); + // Convert UTC LocalDateTime to system default zone LocalDateTime + LocalDateTime localDateTime = + utcDateTime.atZone(ZoneOffset.UTC).withZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime(); + return localDateTime.format(getDateTimeFormatter(formatPattern)); + } + + private DateTimeFormatter getDateTimeFormatter(String pattern) { + DateTimeFormatter formatter = OUTPUT_FORMATTERS.get(pattern); + if (formatter == null) { + formatter = DateTimeFormatter.ofPattern(pattern); + OUTPUT_FORMATTERS.put(pattern, formatter); + } + return formatter; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToTimestampFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToTimestampFunction.java new file mode 100644 index 0000000000..45f51511f1 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToTimestampFunction.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.List; + +/** + * ToTimestampFunction + * description: + * to_timestamp(string1[, string2])--converts date time string string1 in format string2 + * (by default: yyyy-MM-dd HH:mm:ss if not specified) under the ‘UTC+0’ time zone to a timestamp + */ +public class ToTimestampFunction implements ValueParser { + + private ValueParser stringParser; + private ValueParser formatParser; + private static final String DEFAULT_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public ToTimestampFunction(Function expr) { + List<Expression> expressions = expr.getParameters().getExpressions(); + + // Determine the number of arguments and build parser + stringParser = OperatorTools.buildParser(expressions.get(0)); + if (expressions.size() == 2) { + formatParser = OperatorTools.buildParser(expressions.get(1)); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + String dateString = OperatorTools.parseString(stringParser.parse(sourceData, rowIndex, context)); + String formatPattern = + formatParser != null ? OperatorTools.parseString(formatParser.parse(sourceData, rowIndex, context)) + : DEFAULT_FORMAT; + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(formatPattern); + + LocalDateTime dateTime; + try { + // Try parsing as LocalDateTime + dateTime = LocalDateTime.parse(dateString, formatter); + } catch (DateTimeParseException e) { + // If LocalDateTime parsing fails, try parsing as LocalDate and use default LocalTime + LocalDate date = LocalDate.parse(dateString, formatter); + dateTime = LocalDateTime.of(date, LocalTime.MIDNIGHT); + } + + // Convert LocalDateTime to Timestamp in UTC+0 + long utcMillis = dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + Timestamp timestamp = new Timestamp(utcMillis); + + // Convert Timestamp to a string in UTC+0 + DateTimeFormatter outputFormatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S").withZone(ZoneId.of("UTC")); + return outputFormatter.format(timestamp.toInstant()); + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/UnixTimestampFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/UnixTimestampFunction.java new file mode 100644 index 0000000000..518fb6a646 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/UnixTimestampFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.List; + +/** + * UnixTimestampFunction + * description: + * unix_timestamp(): returns current Unix timestamp in seconds + * unix_timestamp(string1[, string2]): converts date time string string1 in format string2 + * (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds) + */ +public class UnixTimestampFunction implements ValueParser { + + private ValueParser stringParser; + private ValueParser formatParser; + private static final String DEFAULT_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public UnixTimestampFunction(Function expr) { + if (expr.getParameters() == null) { + return; + } + List<Expression> expressions = expr.getParameters().getExpressions(); + + // Determine the number of arguments and build parser + stringParser = OperatorTools.buildParser(expressions.get(0)); + if (expressions.size() == 2) { + formatParser = OperatorTools.buildParser(expressions.get(1)); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + // If stringParser is null, return current Unix timestamp in seconds + if (stringParser == null) { + return Instant.now().getEpochSecond(); + } + + String dateString = OperatorTools.parseString(stringParser.parse(sourceData, rowIndex, context)); + String formatPattern = + formatParser != null ? OperatorTools.parseString(formatParser.parse(sourceData, rowIndex, context)) + : DEFAULT_FORMAT; + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(formatPattern); + + // Parse the input date string with the given format + LocalDateTime dateTime = LocalDateTime.parse(dateString, formatter); + + // Convert LocalDateTime to Unix timestamp in seconds + return dateTime.atZone(ZoneId.systemDefault()).toEpochSecond(); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 8c48aedfe7..200c5b9d17 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -25,6 +25,7 @@ import org.apache.inlong.sdk.transform.process.function.DateExtractFunction.Date import org.apache.inlong.sdk.transform.process.function.DateFormatFunction; import org.apache.inlong.sdk.transform.process.function.ExpFunction; import org.apache.inlong.sdk.transform.process.function.FloorFunction; +import org.apache.inlong.sdk.transform.process.function.FromUnixTimeFunction; import org.apache.inlong.sdk.transform.process.function.LnFunction; import org.apache.inlong.sdk.transform.process.function.LocateFunction; import org.apache.inlong.sdk.transform.process.function.Log10Function; @@ -38,6 +39,8 @@ import org.apache.inlong.sdk.transform.process.function.SqrtFunction; import org.apache.inlong.sdk.transform.process.function.SubstringFunction; import org.apache.inlong.sdk.transform.process.function.TimestampExtractFunction; import org.apache.inlong.sdk.transform.process.function.ToDateFunction; +import org.apache.inlong.sdk.transform.process.function.ToTimestampFunction; +import org.apache.inlong.sdk.transform.process.function.UnixTimestampFunction; import org.apache.inlong.sdk.transform.process.parser.AdditionParser; import org.apache.inlong.sdk.transform.process.parser.ColumnParser; import org.apache.inlong.sdk.transform.process.parser.DateParser; @@ -124,6 +127,9 @@ public class OperatorTools { functionMap.put("second", func -> new TimestampExtractFunction(TimestampExtractFunction.TimestampExtractFunctionType.SECOND, func)); + functionMap.put("from_unixtime", FromUnixTimeFunction::new); + functionMap.put("unix_timestamp", UnixTimestampFunction::new); + functionMap.put("to_timestamp", ToTimestampFunction::new); } public static ExpressionOperator buildOperator(Expression expr) { diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java index 7760428ed4..7a40411bc6 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java @@ -234,4 +234,132 @@ public class TestTransformTemporalFunctionsProcessor { Assert.assertEquals(1, output3.size()); Assert.assertEquals(output3.get(0), "result=34"); } + + @Test + public void testFromUnixTimeFunction() throws Exception { + String transformSql1 = "select from_unixtime(numeric1) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor<String, String> processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: from_unixtime(44) + List<String> output1 = processor1.transform("can|apple|cloud|44|1|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1970-01-01 08:00:44"); + + String transformSql2 = "select from_unixtime(numeric1, 'yyyy/MM/dd HH:mm:ss') from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor<String, String> processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case2: from_unixtime(44, 'yyyy/MM/dd HH:mm:ss') + List<String> output2 = processor2.transform("can|apple|cloud|44|1|3", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=1970/01/01 08:00:44"); + + String transformSql3 = "select from_unixtime(numeric1, 'MMdd-yyyy') from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor<String, String> processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: from_unixtime(44, 'MMdd-yyyy') + List<String> output3 = processor3.transform("can|apple|cloud|44|1|3", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=0101-1970"); + + String transformSql4 = "select from_unixtime(numeric1, 'yyyyMMddHHss') from source"; + TransformConfig config4 = new TransformConfig(transformSql4); + TransformProcessor<String, String> processor4 = TransformProcessor + .create(config4, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case4: from_unixtime(44, 'yyyyMMddHHss') + List<String> output4 = processor4.transform("can|apple|cloud|44|1|3", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=197001010844"); + } + + @Test + public void testUnixTimestampFunction() throws Exception { + String transformSql1 = "select unix_timestamp() from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor<String, String> processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: unix_timestamp() + List<String> output1 = processor1.transform("", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + + String transformSql2 = "select unix_timestamp(string1, 'yyyy/MM/dd HH:mm:ss') from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor<String, String> processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case2: unix_timestamp('1970/01/01 08:00:44', 'yyyy/MM/dd HH:mm:ss') + List<String> output2 = processor2.transform("1970/01/01 08:00:44", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=44"); + + String transformSql3 = "select unix_timestamp(string1) from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor<String, String> processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: unix_timestamp('1970/01/01 08:00:44') + List<String> output3 = processor3.transform("1970-01-01 08:00:44", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=44"); + + String transformSql4 = "select unix_timestamp(string1, 'yyyyMMddHHmmss') from source"; + TransformConfig config4 = new TransformConfig(transformSql4); + TransformProcessor<String, String> processor4 = TransformProcessor + .create(config4, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case4: unix_timestamp('19700101080044', 'yyyyMMddHHss') + List<String> output4 = processor4.transform("19700101080044", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=44"); + } + + @Test + public void testToTimestampFunction() throws Exception { + String transformSql1 = "select to_timestamp(string1) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor<String, String> processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: to_timestamp('1970-01-01 00:00:44') + List<String> output1 = processor1.transform("1970-01-01 00:00:44", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1970-01-01 00:00:44.0"); + + String transformSql2 = "select to_timestamp(string1, 'yyyy/MM/dd HH:mm:ss') from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor<String, String> processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case2: to_timestamp('1970/01/01 00:00:44', 'yyyy/MM/dd HH:mm:ss') + List<String> output2 = processor2.transform("1970/01/01 00:00:44", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=1970-01-01 00:00:44.0"); + + String transformSql3 = "select to_timestamp(string1, 'yyyyMMddHHmmss') from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor<String, String> processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: to_timestamp('19700101000044', 'yyyyMMddHHmmss') + List<String> output3 = processor3.transform("19700101000044", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=1970-01-01 00:00:44.0"); + + String transformSql4 = "select to_timestamp(string1, 'yyyy-MM-dd') from source"; + TransformConfig config4 = new TransformConfig(transformSql4); + TransformProcessor<String, String> processor4 = TransformProcessor + .create(config4, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case4: to_timestamp('1970-01-01', 'yyyy-MM-dd') + List<String> output4 = processor4.transform("1970-01-01", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=1970-01-01 00:00:00.0"); + } }