This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3b8390784d [multistage] Support TIMESTAMP type and date ops functions
(#11350)
3b8390784d is described below
commit 3b8390784da62cd1da564e74afe93ca8dfef2d21
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Aug 17 08:52:01 2023 -0700
[multistage] Support TIMESTAMP type and date ops functions (#11350)
---
.../common/function/TransformFunctionType.java | 20 ++
.../common/request/context/LiteralContext.java | 6 +-
.../org/apache/pinot/common/utils/DataSchema.java | 11 +-
.../core/common/datablock/DataBlockBuilder.java | 13 +-
.../function/ScalarTransformFunctionWrapper.java | 8 +
.../integration/tests/custom/TimestampTest.java | 326 +++++++++++++++++++++
.../query/parser/CalciteRexExpressionParser.java | 5 +
.../planner/logical/RelToPlanNodeConverter.java | 13 +
.../pinot/query/planner/logical/RexExpression.java | 5 +
.../query/planner/logical/RexExpressionUtils.java | 9 +
.../pinot/query/planner/plannode/ValueNode.java | 12 +
.../query/service/dispatch/QueryDispatcher.java | 9 +-
.../pinot/query/runtime/QueryRunnerTestBase.java | 2 +
13 files changed, 427 insertions(+), 12 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
index f741ff223e..b27c6cd53b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
@@ -39,6 +39,11 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
+/**
+ * The {@code TransformFunctionType} enum represents all the transform
functions supported by Calcite SQL parser in
+ * v2 engine.
+ * TODO: Add support for scalar functions auto registration.
+ */
public enum TransformFunctionType {
// arithmetic functions for single-valued columns
ADD("add", "plus"),
@@ -124,6 +129,21 @@ public enum TransformFunctionType {
SqlTypeFamily.CHARACTER),
ordinal -> ordinal > 1)),
+ FROMDATETIME("fromDateTime", ReturnTypes.TIMESTAMP_NULLABLE,
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.CHARACTER,
SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER),
+ ordinal -> ordinal > 1)),
+
+ TODATETIME("toDateTime", ReturnTypes.VARCHAR_2000_NULLABLE,
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY,
SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER),
+ ordinal -> ordinal > 1)),
+
+ TIMESTAMPADD("timestampAdd", ReturnTypes.TIMESTAMP_NULLABLE,
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.CHARACTER,
SqlTypeFamily.NUMERIC, SqlTypeFamily.ANY)),
+ "dateAdd"),
+
+ TIMESTAMPDIFF("timestampDiff", ReturnTypes.BIGINT_NULLABLE,
+ OperandTypes.family(ImmutableList.of(SqlTypeFamily.CHARACTER,
SqlTypeFamily.ANY, SqlTypeFamily.ANY)), "dateDiff"),
+
YEAR("year"),
YEAR_OF_WEEK("yearOfWeek", "yow"),
QUARTER("quarter"),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/LiteralContext.java
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/LiteralContext.java
index 3436e826f3..0fa430a0c7 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/LiteralContext.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/LiteralContext.java
@@ -122,14 +122,14 @@ public class LiteralContext {
Pair<FieldSpec.DataType, Object> typeAndValue =
inferLiteralDataTypeAndValue(literal.getFieldValue().toString());
_type = typeAndValue.getLeft();
- _value = typeAndValue.getRight();
if (_type == FieldSpec.DataType.BIG_DECIMAL) {
- _bigDecimalValue = (BigDecimal) _value;
+ _bigDecimalValue = (BigDecimal) typeAndValue.getRight();
} else if (_type == FieldSpec.DataType.TIMESTAMP) {
- _bigDecimalValue =
PinotDataType.TIMESTAMP.toBigDecimal(Timestamp.valueOf(_value.toString()));
+ _bigDecimalValue =
PinotDataType.TIMESTAMP.toBigDecimal(typeAndValue.getRight());
} else {
_bigDecimalValue = BigDecimal.ZERO;
}
+ _value = literal.getFieldValue().toString();
break;
case NULL_VALUE:
_type = FieldSpec.DataType.UNKNOWN;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 282a3d7416..80c51fb846 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -353,7 +353,10 @@ public class DataSchema {
case BOOLEAN:
return ((Number) value).intValue() == 1;
case TIMESTAMP:
- return new Timestamp((long) value);
+ if (value instanceof Timestamp) {
+ return (Timestamp) value;
+ }
+ return new Timestamp(((Number) value).longValue());
case STRING:
case JSON:
return value.toString();
@@ -416,8 +419,14 @@ public class DataSchema {
case BIG_DECIMAL:
return (BigDecimal) value;
case BOOLEAN:
+ if (value instanceof Boolean) {
+ return (boolean) value;
+ }
return ((Number) value).intValue() == 1;
case TIMESTAMP:
+ if (value instanceof Timestamp) {
+ return value.toString();
+ }
return new Timestamp((long) value).toString();
case STRING:
case JSON:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 8b486b4012..8bce975c70 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -148,10 +148,19 @@ public class DataBlockBuilder {
setColumn(rowBuilder, byteBuffer, (BigDecimal) value);
break;
case BOOLEAN:
- byteBuffer.putInt(((Boolean) value) ? 1 : 0);
+ if (value instanceof Boolean) {
+ byteBuffer.putInt(((Boolean) value) ? 1 : 0);
+ } else {
+ byteBuffer.putInt(((Number) value).intValue() > 0 ? 1 : 0);
+ }
break;
case TIMESTAMP:
- byteBuffer.putLong(((Timestamp) value).getTime());
+ // Certain non strong typed functions in v2 might return long
value instead of Timestamp.
+ if (value instanceof Long) {
+ byteBuffer.putLong((long) value);
+ } else {
+ byteBuffer.putLong(((Timestamp) value).getTime());
+ }
break;
case STRING:
setColumn(rowBuilder, byteBuffer, (String) value);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
index 3a2c35b90a..09f3c27421 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
@@ -117,10 +117,18 @@ public class ScalarTransformFunctionWrapper extends
BaseTransformFunction {
parameterTypes[i].convert(literalTransformFunction.getDoubleLiteral(),
PinotDataType.DOUBLE);
break;
case BIG_DECIMAL:
+ if (parameterTypes[i] == PinotDataType.STRING) {
+ _scalarArguments[i] =
literalTransformFunction.getStringLiteral();
+ break;
+ }
_scalarArguments[i] =
parameterTypes[i].convert(literalTransformFunction.getBigDecimalLiteral(),
PinotDataType.BIG_DECIMAL);
break;
case TIMESTAMP:
+ if (parameterTypes[i] == PinotDataType.STRING) {
+ _scalarArguments[i] =
literalTransformFunction.getStringLiteral();
+ break;
+ }
_scalarArguments[i] =
parameterTypes[i].convert(literalTransformFunction.getLongLiteral(),
PinotDataType.TIMESTAMP);
break;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
new file mode 100644
index 0000000000..c9995e3785
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.sql.Timestamp;
+import java.util.TimeZone;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.common.function.scalar.DateTimeFunctions;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class TimestampTest extends CustomDataQueryClusterIntegrationTest {
+
+ private static final String DEFAULT_TABLE_NAME = "TimestampTest";
+ private static final String TIMESTAMP_1 = "ts1";
+ private static final String TIMESTAMP_2 = "ts2";
+ private static final String LONG_1 = "long1";
+ private static final String LONG_2 = "long2";
+
+ private static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getDefault();
+
+ @Override
+ protected long getCountStarResult() {
+ return 1000;
+ }
+
+ @BeforeClass
+ public void setUpTimeZone() {
+ TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+ }
+
+ @AfterClass
+ public void removeTimeZone() {
+ TimeZone.setDefault(DEFAULT_TIME_ZONE);
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testSelectQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = String.format("SELECT ts1, ts2, long1,long2 FROM %s LIMIT
%d", getTableName(), getCountStarResult());
+ JsonNode jsonNode = postQuery(query);
+ long expectedTs1 = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00",
"yyyy-MM-dd HH:mm:ss");
+ long expectedTs2 = DateTimeFunctions.fromDateTime("2019-01-01 12:00:00",
"yyyy-MM-dd HH:mm:ss");
+
+ for (int i = 0; i < getCountStarResult(); i++) {
+ String ts1 =
jsonNode.get("resultTable").get("rows").get(i).get(0).asText();
+ String ts2 =
jsonNode.get("resultTable").get("rows").get(i).get(1).asText();
+ long long1 =
jsonNode.get("resultTable").get("rows").get(i).get(2).asLong();
+ long long2 =
jsonNode.get("resultTable").get("rows").get(i).get(3).asLong();
+ assertEquals(ts1, new Timestamp(expectedTs1).toString());
+ assertEquals(ts2, new Timestamp(expectedTs2).toString());
+ assertEquals(long1, expectedTs1);
+ assertEquals(long2, expectedTs2);
+ expectedTs1 += 86400000;
+ expectedTs2 += 86400000;
+ }
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testSelectWithCastQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = String.format("\n"
+ + "SELECT CAST(DATETRUNC('DAY',
CAST(FROMDATETIME(TODATETIME(FROMDATETIME(CAST(CAST(ts1 AS TIMESTAMP) AS "
+ + "VARCHAR), 'yyyy-MM-dd HH:mm:ss.S'), 'yyyy-MM-dd'), 'yyyy-MM-dd') AS
TIMESTAMP), 'MILLISECONDS') AS "
+ + "TIMESTAMP) AS tdy_Calculation_2683863928708153344_ok\n"
+ + "FROM %s\n"
+ + "GROUP BY tdy_Calculation_2683863928708153344_ok\n"
+ + "ORDER BY tdy_Calculation_2683863928708153344_ok ASC\n"
+ + "LIMIT %d", getTableName(), getCountStarResult());
+ JsonNode jsonNode = postQuery(query);
+ long expectedTs1 = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00",
"yyyy-MM-dd HH:mm:ss");
+ for (int i = 0; i < getCountStarResult(); i++) {
+ String ts1 =
jsonNode.get("resultTable").get("rows").get(i).get(0).asText();
+ assertEquals(ts1, new Timestamp(expectedTs1).toString());
+ expectedTs1 += 86400000;
+ }
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testSelectWithCastAndFilterQueries(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = String.format("\n"
+ + "SELECT CAST(DATETRUNC('DAY',
CAST(FROMDATETIME(TODATETIME(FROMDATETIME(CAST(CAST(ts1 AS TIMESTAMP) AS "
+ + "VARCHAR), 'yyyy-MM-dd HH:mm:ss.S'), 'yyyy-MM-dd'), 'yyyy-MM-dd') AS
TIMESTAMP), 'MILLISECONDS') AS "
+ + "TIMESTAMP) AS tdy_Calculation_2683863928708153344_ok\n"
+ + "FROM %s\n"
+ + "WHERE CAST(DATETRUNC('DAY',
CAST(FROMDATETIME(TODATETIME(FROMDATETIME(CAST(CAST(ts1 AS TIMESTAMP) AS "
+ + "VARCHAR), 'yyyy-MM-dd HH:mm:ss.S'), 'yyyy-MM-dd'), 'yyyy-MM-dd') AS
TIMESTAMP), 'MILLISECONDS') AS "
+ + "TIMESTAMP) = FROMDATETIME( '2019-01-01 00:00:00', 'yyyy-MM-dd
HH:mm:ss')\n", getTableName());
+ JsonNode jsonNode = postQuery(query);
+ assertEquals(jsonNode.get("resultTable").get("rows").size(), 1);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText(),
"2019-01-01 00:00:00.0");
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testTimeExtractFunction(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = String.format("\n"
+ + "SELECT HOUR(ts1), HOUR(ts2),\n"
+ + "MINUTE(ts1), MINUTE(ts2),\n"
+ + "SECOND(ts1), SECOND(ts2),\n"
+ + "MILLISECOND(ts1), MILLISECOND(ts2),\n"
+ + "YEAR(ts1), YEAR(ts2),\n"
+ + "YEAR_OF_WEEK(ts1), YEAR_OF_WEEK(ts2),\n"
+ + "MONTH_OF_YEAR(ts1), MONTH_OF_YEAR(ts2),\n"
+ + "WEEK_OF_YEAR(ts1), WEEK_OF_YEAR(ts2),\n"
+ + "DAY_OF_YEAR(ts1), DAY_OF_YEAR(ts2),\n"
+ + "DAY_OF_MONTH(ts1), DAY_OF_MONTH(ts2),\n"
+ + "DAY_OF_WEEK(ts1), DAY_OF_WEEK(ts2)\n"
+ + "FROM %s\n"
+ + "LIMIT %d\n", getTableName(), getCountStarResult());
+ JsonNode jsonNode = postQuery(query);
+ for (int i = 0; i < getCountStarResult(); i++) {
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asInt(), 0);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(1).asInt(), 12);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(2).asInt(), 0);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(3).asInt(), 0);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(4).asInt(), 0);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(5).asInt(), 0);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(6).asInt(), 0);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(7).asInt(), 0);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(8).asInt(),
+ jsonNode.get("resultTable").get("rows").get(i).get(9).asInt());
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(10).asInt(),
+ jsonNode.get("resultTable").get("rows").get(i).get(11).asInt());
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(12).asInt(),
+ jsonNode.get("resultTable").get("rows").get(i).get(13).asInt());
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(14).asInt(),
+ jsonNode.get("resultTable").get("rows").get(i).get(15).asInt());
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(16).asInt(),
+ jsonNode.get("resultTable").get("rows").get(i).get(17).asInt());
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(18).asInt(),
+ jsonNode.get("resultTable").get("rows").get(i).get(19).asInt());
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(20).asInt(),
+ jsonNode.get("resultTable").get("rows").get(i).get(21).asInt());
+ }
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testTimestampDiffQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = String.format("\n"
+ + "SELECT TIMESTAMPDIFF(second, ts1, ts2)\n"
+ + "FROM %s\n"
+ + "LIMIT %d\n", getTableName(), getCountStarResult());
+ JsonNode jsonNode = postQuery(query);
+ for (int i = 0; i < getCountStarResult(); i++) {
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asLong(),
43200);
+ }
+
+ query = String.format("\n"
+ + "SELECT TIMESTAMPDIFF(minute, ts1, ts2)\n"
+ + "FROM %s\n"
+ + "LIMIT %d\n", getTableName(), getCountStarResult());
+ jsonNode = postQuery(query);
+ for (int i = 0; i < getCountStarResult(); i++) {
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asLong(),
720);
+ }
+
+ query = String.format("\n"
+ + "SELECT TIMESTAMPDIFF(hour, ts1, ts2)\n"
+ + "FROM %s\n"
+ + "LIMIT %d\n", getTableName(), getCountStarResult());
+ jsonNode = postQuery(query);
+ for (int i = 0; i < getCountStarResult(); i++) {
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asLong(),
12);
+ }
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testTimestampAddQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = String.format("\n"
+ + "SELECT TIMESTAMPADD(MINUTE, 720, ts1), ts2\n"
+ + "FROM %s\n"
+ + "LIMIT %d\n", getTableName(), getCountStarResult());
+ JsonNode jsonNode = postQuery(query);
+ for (int i = 0; i < getCountStarResult(); i++) {
+ if (useMultiStageQueryEngine) {
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asText(),
+ jsonNode.get("resultTable").get("rows").get(i).get(1).textValue());
+ } else {
+ assertEquals(new
Timestamp(jsonNode.get("resultTable").get("rows").get(i).get(0).longValue()).toString(),
+ jsonNode.get("resultTable").get("rows").get(i).get(1).textValue());
+ }
+ }
+
+ query = String.format("\n"
+ + "SELECT TIMESTAMPADD(SECOND, 43200, ts1), ts2\n"
+ + "FROM %s\n"
+ + "LIMIT %d\n", getTableName(), getCountStarResult());
+ jsonNode = postQuery(query);
+ for (int i = 0; i < getCountStarResult(); i++) {
+ if (useMultiStageQueryEngine) {
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asText(),
+ jsonNode.get("resultTable").get("rows").get(i).get(1).textValue());
+ } else {
+ assertEquals(new
Timestamp(jsonNode.get("resultTable").get("rows").get(i).get(0).longValue()).toString(),
+ jsonNode.get("resultTable").get("rows").get(i).get(1).textValue());
+ }
+ }
+
+ query = String.format("\n"
+ + "SELECT TIMESTAMPADD(HOUR, 12, ts1), ts2\n"
+ + "FROM %s\n"
+ + "LIMIT %d\n", getTableName(), getCountStarResult());
+ jsonNode = postQuery(query);
+ for (int i = 0; i < getCountStarResult(); i++) {
+ if (useMultiStageQueryEngine) {
+
assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asText(),
+ jsonNode.get("resultTable").get("rows").get(i).get(1).textValue());
+ } else {
+ assertEquals(new
Timestamp(jsonNode.get("resultTable").get("rows").get(i).get(0).longValue()).toString(),
+ jsonNode.get("resultTable").get("rows").get(i).get(1).textValue());
+ }
+ }
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testToDateTimeQueries(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String query = String.format("\n"
+ + "SELECT "
+ + (
+ useMultiStageQueryEngine
+ ? "TODATETIME(CAST(MIN(ts1) AS BIGINT), 'yyyy-MM-dd HH:mm:ss'),\n"
+ : "TODATETIME(MIN(ts1), 'yyyy-MM-dd HH:mm:ss'),\n")
+ + (
+ useMultiStageQueryEngine
+ ? "TODATETIME(CAST(MIN(ts2) AS BIGINT), 'yyyy-MM-dd HH:mm:ss')\n"
+ : "TODATETIME(MIN(ts2), 'yyyy-MM-dd HH:mm:ss')\n")
+ + "FROM %s\n", getTableName());
+ JsonNode jsonNode = postQuery(query);
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText(),
"2019-01-01 00:00:00");
+
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).textValue(),
"2019-01-01 12:00:00");
+ }
+
+ @Override
+ public String getTableName() {
+ return DEFAULT_TABLE_NAME;
+ }
+
+ @Override
+ public Schema createSchema() {
+ return new Schema.SchemaBuilder().setSchemaName(getTableName())
+ .addSingleValueDimension(TIMESTAMP_1, FieldSpec.DataType.TIMESTAMP)
+ .addSingleValueDimension(TIMESTAMP_2, FieldSpec.DataType.TIMESTAMP)
+ .addSingleValueDimension(LONG_1, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_2, FieldSpec.DataType.LONG)
+ .build();
+ }
+
+ @Override
+ public File createAvroFile()
+ throws Exception {
+ // create avro schema
+ org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+ avroSchema.setFields(ImmutableList.of(
+ new org.apache.avro.Schema.Field(TIMESTAMP_1,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
+ null, null),
+ new org.apache.avro.Schema.Field(TIMESTAMP_2,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
+ null, null),
+ new org.apache.avro.Schema.Field(LONG_1,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null,
+ null),
+ new org.apache.avro.Schema.Field(LONG_2,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null,
+ null)
+ ));
+
+ // create avro file
+ File avroFile = new File(_tempDir, "data.avro");
+ try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ fileWriter.create(avroSchema, avroFile);
+ long ts1 = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00",
"yyyy-MM-dd HH:mm:ss");
+ long ts2 = DateTimeFunctions.fromDateTime("2019-01-01 12:00:00",
"yyyy-MM-dd HH:mm:ss");
+
+ for (int i = 0; i < getCountStarResult(); i++) {
+ // create avro record
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put(TIMESTAMP_1, ts1);
+ record.put(TIMESTAMP_2, ts2);
+ record.put(LONG_1, ts1);
+ record.put(LONG_2, ts2);
+ // add avro record to file
+ fileWriter.append(record);
+ ts1 += 86400000;
+ ts2 += 86400000;
+ }
+ }
+ return avroFile;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
index 4c31fc86f4..7e4fe191dd 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.parser;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -35,6 +36,7 @@ import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.plannode.SortNode;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.SqlCompilationException;
import org.slf4j.Logger;
@@ -194,6 +196,9 @@ public class CalciteRexExpressionParser {
private static Expression rexLiteralToExpression(RexExpression.Literal
rexLiteral) {
// TODO: currently literals are encoded as strings for V1, remove this and
use directly literal type when it
// supports strong-type in V1.
+ if (rexLiteral.getDataType() == FieldSpec.DataType.TIMESTAMP) {
+ return RequestUtils.getLiteralExpression(((GregorianCalendar)
rexLiteral.getValue()).getTimeInMillis());
+ }
return RequestUtils.getLiteralExpression(rexLiteral.getValue());
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index b0b7545677..7beaab8e42 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -217,6 +217,19 @@ public final class RelToPlanNodeConverter {
case INTEGER:
return isArray ? DataSchema.ColumnDataType.INT_ARRAY :
DataSchema.ColumnDataType.INT;
case BIGINT:
+ case INTERVAL_DAY:
+ case INTERVAL_DAY_HOUR:
+ case INTERVAL_DAY_MINUTE:
+ case INTERVAL_DAY_SECOND:
+ case INTERVAL_HOUR:
+ case INTERVAL_HOUR_MINUTE:
+ case INTERVAL_HOUR_SECOND:
+ case INTERVAL_MINUTE:
+ case INTERVAL_MINUTE_SECOND:
+ case INTERVAL_SECOND:
+ case INTERVAL_MONTH:
+ case INTERVAL_YEAR:
+ case INTERVAL_YEAR_MONTH:
return isArray ? DataSchema.ColumnDataType.LONG_ARRAY :
DataSchema.ColumnDataType.LONG;
case DECIMAL:
return resolveDecimal(relDataType, isArray);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index ab78924548..bacde0d30b 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.planner.logical;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.GregorianCalendar;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.core.AggregateCall;
@@ -62,6 +63,8 @@ public interface RexExpression {
switch (rexCall.getKind()) {
case CAST:
return RexExpressionUtils.handleCast(rexCall);
+ case REINTERPRET:
+ return RexExpressionUtils.handleReinterpret(rexCall);
case SEARCH:
return RexExpressionUtils.handleSearch(rexCall);
case CASE:
@@ -99,6 +102,8 @@ public interface RexExpression {
return ((BigDecimal) value).doubleValue();
case STRING:
return ((NlsString) value).getValue();
+ case TIMESTAMP:
+ return ((GregorianCalendar) value).getTimeInMillis();
default:
return value;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
index 242291c280..d3bf8d5a3b 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
@@ -62,6 +62,15 @@ public class RexExpressionUtils {
"CAST", operands);
}
+ /**
+ * Reinterpret is a pass-through function that does not change the type of
the input.
+ */
+ static RexExpression handleReinterpret(RexCall rexCall) {
+ List<RexNode> operands = rexCall.getOperands();
+ Preconditions.checkState(operands.size() == 1, "REINTERPRET takes only 1
argument");
+ return RexExpression.toRexExpression(operands.get(0));
+ }
+
// TODO: Add support for range filter expressions (e.g. a > 0 and a < 30)
static RexExpression handleSearch(RexCall rexCall) {
List<RexNode> operands = rexCall.getOperands();
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
index 248af5d8a2..acb00487e8 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
@@ -20,11 +20,14 @@ package org.apache.pinot.query.planner.plannode;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
+import java.util.GregorianCalendar;
import java.util.List;
import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;
+import org.apache.pinot.spi.data.FieldSpec;
public class ValueNode extends AbstractPlanNode {
@@ -42,6 +45,15 @@ public class ValueNode extends AbstractPlanNode {
for (List<RexLiteral> literalTuple : literalTuples) {
List<RexExpression> literalRow = new ArrayList<>();
for (RexLiteral literal : literalTuple) {
+ if (literal == null) {
+ literalRow.add(null);
+ continue;
+ }
+ if (literal.getTypeName() == SqlTypeName.TIMESTAMP) {
+ GregorianCalendar tsLiteral = (GregorianCalendar) literal.getValue();
+ literalRow.add(new
RexExpression.Literal(FieldSpec.DataType.TIMESTAMP,
tsLiteral.getTimeInMillis()));
+ continue;
+ }
literalRow.add(RexExpression.toRexExpression(literal));
}
_literalRows.add(literalRow);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 9eff0bc6c4..7b0ad3e3cb 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -65,7 +65,6 @@ import
org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.trace.RequestContext;
-import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -251,6 +250,7 @@ public class QueryDispatcher {
DataBlock dataBlock = transferableBlock.getDataBlock();
int numColumns = resultSchema.getColumnNames().length;
int numRows = dataBlock.getNumberOfRows();
+ DataSchema.ColumnDataType[] columnDataTypes =
resultSchema.getColumnDataTypes();
List<Object[]> rows = new ArrayList<>(dataBlock.getNumberOfRows());
if (numRows > 0) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
@@ -268,11 +268,8 @@ public class QueryDispatcher {
row[colId++] = null;
} else {
int colRef = field.left;
- if (rawRow[colRef] instanceof ByteArray) {
- row[colId++] = ((ByteArray) rawRow[colRef]).toHexString();
- } else {
- row[colId++] = rawRow[colRef];
- }
+ DataSchema.ColumnDataType dataType = columnDataTypes[colId];
+ row[colId++] = dataType.convertAndFormat(rawRow[colRef]);
}
}
rows.add(row);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index c23e8f29fc..52f9c6b0b9 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -228,6 +228,8 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
} else if (l instanceof String) {
if (r instanceof byte[]) {
return ((String) l).compareTo(BytesUtils.toHexString((byte[]) r));
+ } else if (r instanceof Timestamp) {
+ return ((String) l).compareTo((r).toString());
}
return ((String) l).compareTo((String) r);
} else if (l instanceof Boolean) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]