This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new a99de990b IMPALA-12370: Allow converting timestamps to UTC when
writing to Kudu
a99de990b is described below
commit a99de990b0a6fcc89a07a8eedc61fc68fdf3c546
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Fri Jun 7 17:51:44 2024 +0200
IMPALA-12370: Allow converting timestamps to UTC when writing to Kudu
Before this commit, only read support was implemented
(convert_kudu_utc_timestamps=true). This change adds write support:
if write_kudu_utc_timestamps=true, then timestamps are converted
from local time to UTC during DMLs to Kudu. In case of
ambiguous conversions (DST changes) the earlier possible UTC
timestamp is written.
All DMLs supported with Kudu tables are affected:
INSERT, UPSERT, UPDATE, DELETE
To be able to read back Kudu tables written by Impala correctly
convert_kudu_utc_timestamps and write_kudu_utc_timestamps need to
have the same value. Having the same value in the two query option
is also critical for UPDATE/DELETE if the primary key contains a
timestamp column - these operations do a scan first (affected by
convert_kudu_utc_timestamps) and then use the keys from the scan to
select updated/deleted rows (affected by write_kudu_utc_timestamps).
The conversion is implemented by adding to_utc_timestamp() to inserted
timestamp expressions during planning. This allows doing the same
conversion during the pre-insert sorting and partitioning.
Read support is implemented differently - in that case the plan is not
changed and the scanner does the conversion.
Other changes:
- Before this change, verification of tests with TIMESTAMP results
were skipped when the file format is Kudu. This shouldn't be
necessary so the skipping was removed.
Change-Id: Ibb4995a64e042e7bb261fcc6e6bf7ffce61e9bd1
Reviewed-on: http://gerrit.cloudera.org:8080/21492
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Peter Rozsa <[email protected]>
---
be/src/service/query-options.cc | 4 +
be/src/service/query-options.h | 4 +-
common/thrift/ImpalaService.thrift | 10 +-
common/thrift/Query.thrift | 3 +
.../org/apache/impala/analysis/InsertStmt.java | 18 ++-
.../org/apache/impala/analysis/KuduModifyImpl.java | 22 ++-
.../org/apache/impala/analysis/UpdateStmt.java | 2 +-
.../main/java/org/apache/impala/util/ExprUtil.java | 11 +-
.../org/apache/impala/planner/PlannerTest.java | 11 ++
.../functional/functional_schema_template.sql | 17 ++
.../datasets/functional/schema_constraints.csv | 10 +-
.../PlannerTest/kudu-dml-with-utc-conversion.test | 113 +++++++++++++
.../kudu_predicate_with_timestamp_conversion.test | 8 +-
...u_runtime_filter_with_timestamp_conversion.test | 10 +-
.../QueryTest/kudu_timestamp_conversion.test | 178 +++++++++++++++++++--
tests/common/test_result_verifier.py | 4 +-
tests/query_test/test_kudu.py | 8 +-
17 files changed, 391 insertions(+), 42 deletions(-)
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 7e561200b..47c49bdd0 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1297,6 +1297,10 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type
option, const string& va
query_options->__set_use_null_slots_cache(IsTrue(value));
break;
}
+ case TImpalaQueryOptions::WRITE_KUDU_UTC_TIMESTAMPS: {
+ query_options->__set_write_kudu_utc_timestamps(IsTrue(value));
+ break;
+ }
default:
string key = to_string(option);
if (IsRemovedQueryOption(key)) {
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 96402d4e8..ac17ac035 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -52,7 +52,7 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE
\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),
\
- TImpalaQueryOptions::USE_NULL_SLOTS_CACHE + 1);
\
+ TImpalaQueryOptions::WRITE_KUDU_UTC_TIMESTAMPS + 1);
\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded,
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)
\
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)
\
@@ -333,6 +333,8 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
QUERY_OPT_FN(slot_count_strategy, SLOT_COUNT_STRATEGY,
TQueryOptionLevel::ADVANCED) \
QUERY_OPT_FN(clean_dbcp_ds_cache, CLEAN_DBCP_DS_CACHE,
TQueryOptionLevel::ADVANCED) \
QUERY_OPT_FN(use_null_slots_cache, USE_NULL_SLOTS_CACHE,
TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(write_kudu_utc_timestamps,
\
+ WRITE_KUDU_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED)
\
;
/// Enforce practical limits on some query options to avoid undesired query
state.
diff --git a/common/thrift/ImpalaService.thrift
b/common/thrift/ImpalaService.thrift
index 3884d2fd1..7ab658f87 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -881,8 +881,9 @@ enum TImpalaQueryOptions {
// See KUDU-3326 for details.
KUDU_TABLE_RESERVE_SECONDS = 168
- // When true, TIMESTAMPs read from Kudu will be converted from UTC to local
time.
- // Writes are unaffected.
+ // When true, UNIXTIME_MICRO columns read from Kudu will be interpreted as
UTC and
+ // and UTC->local timezone conversion is applied when converting to Impala
TIMESTAMP.
+ // Writes are unaffected (see WRITE_KUDU_UTC_TIMESTAMPS).
CONVERT_KUDU_UTC_TIMESTAMPS = 169
// This only makes sense when 'CONVERT_KUDU_UTC_TIMESTAMPS' is true. When
true, it
@@ -947,6 +948,11 @@ enum TImpalaQueryOptions {
// of expressions. The cache helps with generated expressions, which often
contain lots
// of repeated patterns.
USE_NULL_SLOTS_CACHE = 179
+
+ // When true, Impala TIMESTAMPs are converted from local timezone to UTC
before being
+ // written to Kudu as UNIXTIME_MICRO.
+ // Reads are unaffected (see CONVERT_KUDU_UTC_TIMESTAMPS).
+ WRITE_KUDU_UTC_TIMESTAMPS = 180
}
// The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 31e6a6aca..7bba052ee 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -732,6 +732,9 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
180: optional bool use_null_slots_cache = true;
+
+ // See comment in ImpalaService.thrift
+ 181: optional bool write_kudu_utc_timestamps = false;
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and
external
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 82f3cbf4c..f16e8c20b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -61,6 +61,7 @@ import org.apache.impala.planner.TableSink;
import org.apache.impala.rewrite.ExprRewriter;
import org.apache.impala.thrift.TIcebergPartitionTransformType;
import org.apache.impala.thrift.TSortingOrder;
+import org.apache.impala.util.ExprUtil;
import org.apache.impala.util.IcebergUtil;
/**
@@ -849,6 +850,9 @@ public class InsertStmt extends DmlStatementBase {
widestTypeExprList = unionStmt.getWidestExprs();
}
+ boolean convertToUtc =
+ isKuduTable &&
analyzer.getQueryOptions().isWrite_kudu_utc_timestamps();
+
// Check dynamic partition columns for type compatibility.
for (int i = 0; i < selectListExprs.size(); ++i) {
Column targetColumn = selectExprTargetColumns.get(i);
@@ -899,7 +903,12 @@ public class InsertStmt extends DmlStatementBase {
Column c = table_.getColumns().get(i);
for (int j = 0; j < tmpPartitionKeyNames.size(); ++j) {
if (c.getName().equals(tmpPartitionKeyNames.get(j))) {
- partitionKeyExprs_.add(tmpPartitionKeyExprs.get(j));
+ Expr expr = tmpPartitionKeyExprs.get(j);
+ if (convertToUtc && expr.getType().isTimestamp()) {
+ expr = ExprUtil.toUtcTimestampExpr(
+ analyzer, expr, true /*expectPreIfNonUnique*/);
+ }
+ partitionKeyExprs_.add(expr);
partitionColPos_.add(i);
break;
}
@@ -938,7 +947,12 @@ public class InsertStmt extends DmlStatementBase {
boolean matchFound = false;
for (int i = 0; i < selectListExprs.size(); ++i) {
if
(selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) {
- resultExprs_.add(selectListExprs.get(i));
+ Expr expr = selectListExprs.get(i);
+ if (convertToUtc && expr.getType().isTimestamp()) {
+ expr = ExprUtil.toUtcTimestampExpr(
+ analyzer, expr, true /*expectPreIfNonUnique*/);
+ }
+ resultExprs_.add(expr);
if (isKuduTable) mentionedColumns_.add(col);
matchFound = true;
break;
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
index d82919a7c..1153758de 100644
--- a/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
@@ -24,6 +24,7 @@ import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.Pair;
+import org.apache.impala.util.ExprUtil;
import java.util.ArrayList;
import java.util.Collections;
@@ -94,6 +95,8 @@ abstract class KuduModifyImpl extends ModifyImpl {
addKeyColumns(analyzer, selectList, referencedColumns_, uniqueSlots,
keySlots, colIndexMap);
+ boolean convertToUtc =
analyzer.getQueryOptions().isWrite_kudu_utc_timestamps();
+
// Assignments are only used in the context of updates.
for (Pair<SlotRef, Expr> valueAssignment : modifyStmt_.assignments_) {
SlotRef lhsSlotRef = valueAssignment.first;
@@ -125,6 +128,11 @@ abstract class KuduModifyImpl extends ModifyImpl {
rhsExpr = StatementBase.checkTypeCompatibility(
modifyStmt_.targetTableRef_.getDesc().getTable().getFullName(),
c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
+
+ if (convertToUtc && rhsExpr.getType().isTimestamp()) {
+ rhsExpr = ExprUtil.toUtcTimestampExpr(
+ analyzer, rhsExpr, true /*expectPreIfNonUnique*/);
+ }
uniqueSlots.add(lhsSlotRef.getSlotId());
selectList.add(new SelectListItem(rhsExpr, null));
referencedColumns_.add(colIndexMap.get(c.getName()));
@@ -148,23 +156,29 @@ abstract class KuduModifyImpl extends ModifyImpl {
List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId>
keySlots,
Map<String, Integer> colIndexMap, String colName)
throws AnalysisException {
- SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns,
uniqueSlots,
+ Expr ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
keySlots, colIndexMap, colName);
resultExprs_.add(ref);
}
- private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem>
selectList,
+ private Expr addSlotRef(Analyzer analyzer, List<SelectListItem> selectList,
List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId>
keySlots,
Map<String, Integer> colIndexMap, String colName) throws
AnalysisException {
List<String> path =
Path.createRawPath(modifyStmt_.targetTableRef_.getUniqueAlias(),
colName);
SlotRef ref = new SlotRef(path);
ref.analyze(analyzer);
- selectList.add(new SelectListItem(ref, null));
+ Expr expr = ref;
+ boolean convertToUtc =
analyzer.getQueryOptions().isWrite_kudu_utc_timestamps();
+ if (convertToUtc && expr.getType().isTimestamp()) {
+ expr = ExprUtil.toUtcTimestampExpr(
+ analyzer, expr, true /*expectPreIfNonUnique*/);
+ }
+ selectList.add(new SelectListItem(expr, null));
uniqueSlots.add(ref.getSlotId());
keySlots.add(ref.getSlotId());
referencedColumns.add(colIndexMap.get(colName));
- return ref;
+ return expr;
}
@Override
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index 0779cf808..3f22d17dc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -45,7 +45,7 @@ import com.google.common.base.Preconditions;
* clause. The type of the right-hand side of each assignments must be
* assignment compatible with the left-hand side column type.
*
- * Currently, only Kudu tables can be updated.
+ * Currently, only Kudu and Iceberg tables can be updated.
*/
public class UpdateStmt extends ModifyStmt {
public UpdateStmt(List<String> targetTablePath, FromClause tableRefs,
diff --git a/fe/src/main/java/org/apache/impala/util/ExprUtil.java
b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
index 87a0a8c29..3ced390fe 100644
--- a/fe/src/main/java/org/apache/impala/util/ExprUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
@@ -83,6 +83,7 @@ public class ExprUtil {
*/
public static long localTimestampToUnixTimeMicros(Analyzer analyzer, Expr
timestampExpr)
throws AnalysisException, InternalException {
+ Preconditions.checkArgument(timestampExpr.isConstant());
return utcTimestampToUnixTimeMicros(analyzer,
toUtcTimestampExpr(analyzer, timestampExpr, null));
}
@@ -100,6 +101,7 @@ public class ExprUtil {
*/
public static Long localTimestampToUnixTimeMicros(Analyzer analyzer, Expr
timestampExpr,
Boolean expectPreIfNonUnique) throws AnalysisException,
InternalException {
+ Preconditions.checkArgument(timestampExpr.isConstant());
Expr toUtcTimestampExpr = toUtcTimestampExpr(analyzer, timestampExpr,
expectPreIfNonUnique);
Expr toUnixTimeExpr = new FunctionCallExpr("utc_to_unix_micros",
@@ -116,14 +118,19 @@ public class ExprUtil {
*/
public static String localTimestampToString(Analyzer analyzer, Expr
timestampExpr)
throws AnalysisException, InternalException {
+ Preconditions.checkArgument(timestampExpr.isConstant());
return utcTimestampToSpecifiedTimeZoneTimestamp(analyzer,
toUtcTimestampExpr(analyzer, timestampExpr, null));
}
- private static Expr toUtcTimestampExpr(Analyzer analyzer, Expr timestampExpr,
+ /**
+ * Wraps 'timestampExpr' in to_utc_timestamp() that converts it to UTC from
local
+ * time zone. If the conversion is ambigious, the earlier/later result will
be returned
+ * based on 'expectPreIfNonUnique'.
+ */
+ public static Expr toUtcTimestampExpr(Analyzer analyzer, Expr timestampExpr,
Boolean expectPreIfNonUnique) throws AnalysisException {
Preconditions.checkArgument(timestampExpr.isAnalyzed());
- Preconditions.checkArgument(timestampExpr.isConstant());
Preconditions.checkArgument(timestampExpr.getType() == Type.TIMESTAMP);
List<Expr> params = Lists.newArrayList(timestampExpr,
new StringLiteral(analyzer.getQueryCtx().getLocal_time_zone()));
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 7af466f9f..403a15e60 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -742,6 +742,17 @@ public class PlannerTest extends PlannerTestBase {
runPlannerTestFile("kudu-stats-agg");
}
+ @Test
+ public void testKuduDmlWithUtcConversion() {
+ TQueryOptions options = defaultQueryOptions();
+ options.setExplain_level(TExplainLevel.VERBOSE);
+ options.setWrite_kudu_utc_timestamps(true);
+ // convert_kudu_utc_timestamps is not really needed for the planner test,
but it would
+ // be critical for update/delete if the queries were actually executed.
+ options.setConvert_kudu_utc_timestamps(true);
+ runPlannerTestFile("kudu-dml-with-utc-conversion", options);
+ }
+
@Test
public void testMtDopValidation() {
// Tests that queries planned with mt_dop > 0 produce a parallel plan.
diff --git a/testdata/datasets/functional/functional_schema_template.sql
b/testdata/datasets/functional/functional_schema_template.sql
index a380b17db..559b78c45 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -4723,3 +4723,20 @@ SELECT id,
case when id % 2 = 0 then date_add(DATE '2023-12-31', interval id days) else
null end
FROM functional.alltypessmall order by id;
====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+timestamp_primary_key
+---- COLUMNS
+tkey timestamp
+t timestamp
+id int
+---- CREATE_KUDU
+DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name};
+CREATE TABLE {db_name}{db_suffix}.{table_name} (
+ tkey TIMESTAMP PRIMARY KEY,
+ t TIMESTAMP,
+ id INT
+)
+PARTITION BY HASH (tkey) PARTITIONS 3 STORED AS KUDU;
+====
diff --git a/testdata/datasets/functional/schema_constraints.csv
b/testdata/datasets/functional/schema_constraints.csv
index 035fccf41..4b2668ec9 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -273,7 +273,7 @@ table_name:large_bzip2_tbl, constraint:restrict_to,
table_format:text/bzip/block
table_name:multistream_bzip2_tbl, constraint:restrict_to,
table_format:text/bzip/block
table_name:large_multistream_bzip2_tbl, constraint:restrict_to,
table_format:text/bzip/block
-# Kudu can't handle certain types such as timestamp so we pick and choose the
tables
+# Kudu needs CREATE_KUDU section in the schema sql so we pick and choose the
tables
# we actually use for Kudu related tests.
table_name:alltypes, constraint:only, table_format:kudu/none/none
table_name:alltypessmall, constraint:only, table_format:kudu/none/none
@@ -300,6 +300,7 @@ table_name:date_tbl, constraint:only,
table_format:kudu/none/none
table_name:timestamp_at_dst_changes, constraint:only,
table_format:kudu/none/none
table_name:binary_tbl, constraint:only, table_format:kudu/none/none
table_name:binary_tbl_big, constraint:only, table_format:kudu/none/none
+table_name:timestamp_primary_key, constraint:only, table_format:kudu/none/none
# Skipping header lines is only effective with text tables
table_name:table_with_header, constraint:restrict_to,
table_format:text/none/none
@@ -422,10 +423,15 @@ table_name:empty_parquet_page_source_impala10186,
constraint:restrict_to, table_
table_name:empty_stream_tbl, constraint:restrict_to, table_format:orc/def/block
# The table is used to test DST changes in timestamps, the timestamps in the
table near
-# DST changes in the 'America/Los_Angeles' time zone.
+# DST changes in the 'America/Los_Angeles' time zone. (DST: UTC-7 / outside
DST: UTC-8)
+# The table containts timestamps from 2011. In 2011 Los Angeles, DST starts at
2 a.m. on
+# Sunday, March 13 (clocks were turned forward 1 hour) and fall back to
standard time
+# again at 2 a.m. on Sunday, November 6 (clocks were turned backward 1 hour).
table_name:timestamp_at_dst_changes, constraint:restrict_to,
table_format:text/none/none
table_name:timestamp_at_dst_changes, constraint:restrict_to,
table_format:kudu/none/none
+table_name:timestamp_primary_key, constraint:restrict_to,
table_format:kudu/none/none
+
# Table where all column values are unique but have some NULLs.
# Depends on functional.alltypessmall.
table_name:unique_with_nulls, constraint:restrict_to,
table_format:parquet/none/none
diff --git
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-dml-with-utc-conversion.test
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-dml-with-utc-conversion.test
new file mode 100644
index 000000000..55f196125
--- /dev/null
+++
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-dml-with-utc-conversion.test
@@ -0,0 +1,113 @@
+insert into functional_kudu.timestamp_primary_key
+values ("1970-01-01", "1970-01-01", 1)
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=20.00MB mem-reservation=0B
thread-reservation=1
+ INSERT INTO KUDU [functional_kudu.timestamp_primary_key]
+ | output exprs: to_utc_timestamp(CAST('1970-01-01' AS TIMESTAMP), 'UTC',
TRUE), to_utc_timestamp(CAST('1970-01-01' AS TIMESTAMP), 'UTC', TRUE), CAST(1
AS INT)
+ | mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+ |
+ 00:UNION
+ constant-operands=1
+ mem-estimate=0B mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=25B cardinality=1
+ in pipelines: <none>
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [KUDU(KuduPartition(to_utc_timestamp('1970-01-01', 'UTC',
TRUE)))] hosts=1 instances=1
+Per-Host Resources: mem-estimate=24.02MB mem-reservation=4.00MB
thread-reservation=1
+ INSERT INTO KUDU [functional_kudu.timestamp_primary_key]
+ | output exprs: to_utc_timestamp('1970-01-01', 'UTC', TRUE),
to_utc_timestamp(CAST('1970-01-01' AS TIMESTAMP), 'UTC', TRUE), CAST(1 AS INT)
+ | mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+ |
+ 02:PARTIAL SORT
+ | order by: KuduPartition(to_utc_timestamp('1970-01-01', 'UTC', TRUE)) ASC
NULLS LAST, to_utc_timestamp('1970-01-01', 'UTC', TRUE) ASC NULLS LAST
+ | materialized: KuduPartition(to_utc_timestamp('1970-01-01', 'UTC', TRUE)),
to_utc_timestamp('1970-01-01', 'UTC', TRUE)
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
thread-reservation=0
+ | tuple-ids=1 row-size=33B cardinality=1
+ | in pipelines: <none>
+ |
+ 01:EXCHANGE [KUDU(KuduPartition(to_utc_timestamp('1970-01-01', 'UTC',
TRUE)))]
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=25B cardinality=1
+ in pipelines: <none>
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=116.00KB mem-reservation=0B
thread-reservation=1
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01,
KUDU(KuduPartition(to_utc_timestamp('1970-01-01', 'UTC', TRUE)))]
+ | mem-estimate=116.00KB mem-reservation=0B thread-reservation=0
+ 00:UNION
+ constant-operands=1
+ mem-estimate=0B mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=25B cardinality=1
+ in pipelines: <none>
+====
+upsert into functional_kudu.timestamp_primary_key
+select ts, ts, id from functional_kudu.timestamp_at_dst_changes
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=21.50MB mem-reservation=0B
thread-reservation=2
+ UPSERT INTO KUDU [functional_kudu.timestamp_primary_key]
+ | output exprs: to_utc_timestamp(ts, 'UTC', TRUE), to_utc_timestamp(ts,
'UTC', TRUE), id
+ | mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.timestamp_at_dst_changes]
+ mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=20B cardinality=unavailable
+ in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [KUDU(KuduPartition(to_utc_timestamp(ts, 'UTC', TRUE)))]
hosts=3 instances=3
+Per-Host Resources: mem-estimate=148.07MB mem-reservation=2.00MB
thread-reservation=1
+ UPSERT INTO KUDU [functional_kudu.timestamp_primary_key]
+ | output exprs: to_utc_timestamp(ts, 'UTC', TRUE), to_utc_timestamp(ts,
'UTC', TRUE), id
+ | mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+ |
+ 02:PARTIAL SORT
+ | order by: KuduPartition(to_utc_timestamp(ts, 'UTC', TRUE)) ASC NULLS
LAST, to_utc_timestamp(ts, 'UTC', TRUE) ASC NULLS LAST
+ | materialized: KuduPartition(to_utc_timestamp(ts, 'UTC', TRUE)),
to_utc_timestamp(ts, 'UTC', TRUE)
+ | mem-estimate=128.00MB mem-reservation=2.00MB spill-buffer=2.00MB
thread-reservation=0
+ | tuple-ids=1 row-size=24B cardinality=unavailable
+ | in pipelines: 00(GETNEXT)
+ |
+ 01:EXCHANGE [KUDU(KuduPartition(to_utc_timestamp(ts, 'UTC', TRUE)))]
+ mem-estimate=71.99KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=20B cardinality=unavailable
+ in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=1.78MB mem-reservation=0B thread-reservation=2
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01,
KUDU(KuduPartition(to_utc_timestamp(ts, 'UTC', TRUE)))]
+ | mem-estimate=288.00KB mem-reservation=0B thread-reservation=0
+ 00:SCAN KUDU [functional_kudu.timestamp_at_dst_changes]
+ mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=20B cardinality=unavailable
+ in pipelines: 00(GETNEXT)
+====
+update functional_kudu.timestamp_primary_key set t="1970-01-01"
+where tkey="1970-01-01"
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=20.75MB mem-reservation=0B
thread-reservation=2
+ UPDATE KUDU [functional_kudu.timestamp_primary_key]
+ | output exprs:
to_utc_timestamp(functional_kudu.timestamp_primary_key.tkey, 'UTC', TRUE),
TIMESTAMP '1970-01-01 00:00:00'
+ | mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.timestamp_primary_key]
+ kudu predicates: tkey = TIMESTAMP '1970-01-01 00:00:00'
+ mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=16B cardinality=1
+ in pipelines: 00(GETNEXT)
+====
+delete from functional_kudu.timestamp_primary_key where tkey="1970-01-01"
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=20.75MB mem-reservation=0B
thread-reservation=2
+ DELETE FROM KUDU [functional_kudu.timestamp_primary_key]
+ | output exprs:
to_utc_timestamp(functional_kudu.timestamp_primary_key.tkey, 'UTC', TRUE)
+ | mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.timestamp_primary_key]
+ kudu predicates: tkey = TIMESTAMP '1970-01-01 00:00:00'
+ mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=16B cardinality=1
+ in pipelines: 00(GETNEXT)
+====
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/kudu_predicate_with_timestamp_conversion.test
b/testdata/workloads/functional-query/queries/QueryTest/kudu_predicate_with_timestamp_conversion.test
index ccd7cb9dd..294c6f7eb 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/kudu_predicate_with_timestamp_conversion.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/kudu_predicate_with_timestamp_conversion.test
@@ -17,7 +17,7 @@ aggregation(SUM, RowsRead): 0
# RowsRead should be 2 (2011-11-06 08:00:00 and 09:00:00 in UTC).
select * from timestamp_at_dst_changes where ts = '2011-11-06 01:00:00';
---- TYPES
-BIGINT,BIGINT,TIMESTAMP
+INT,BIGINT,TIMESTAMP
---- RESULTS
5,1320566400,2011-11-06 01:00:00
8,1320570000,2011-11-06 01:00:00
@@ -32,7 +32,7 @@ aggregation(SUM, RowsRead): 2
# (2011-11-06 08:40:00, 09:00:00, 09:20:00 and 09:40:00 in UTC).
select * from timestamp_at_dst_changes where ts > '2011-11-06 01:30:00';
---- TYPES
-BIGINT,BIGINT,TIMESTAMP
+INT,BIGINT,TIMESTAMP
---- RESULTS
7,1320568800,2011-11-06 01:40:00
10,1320572400,2011-11-06 01:40:00
@@ -45,7 +45,7 @@ aggregation(SUM, RowsRead): 4
# for less compare
select * from timestamp_at_dst_changes where ts < '2011-11-06 01:30:00';
---- TYPES
-BIGINT,BIGINT,TIMESTAMP
+INT,BIGINT,TIMESTAMP
---- RESULTS
1,1300006800,2011-03-13 01:00:00
2,1300008600,2011-03-13 01:30:00
@@ -65,7 +65,7 @@ aggregation(SUM, RowsRead): 9
select * from timestamp_at_dst_changes where ts in
('2011-03-13 02:00:00', '2011-11-06 01:00:00');
---- TYPES
-BIGINT,BIGINT,TIMESTAMP
+INT,BIGINT,TIMESTAMP
---- RESULTS
5,1320566400,2011-11-06 01:00:00
8,1320570000,2011-11-06 01:00:00
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/kudu_runtime_filter_with_timestamp_conversion.test
b/testdata/workloads/functional-query/queries/QueryTest/kudu_runtime_filter_with_timestamp_conversion.test
index 66afae507..3be96d5fd 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/kudu_runtime_filter_with_timestamp_conversion.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/kudu_runtime_filter_with_timestamp_conversion.test
@@ -8,7 +8,7 @@ select straight_join t1.id, t1.ts, t2.id,
from_utc_timestamp(t2.ts, 'America/Los
on t1.ts = from_utc_timestamp(t2.ts, 'America/Los_Angeles')
order by t1.id, t2.id;
---- TYPES
-BIGINT,TIMESTAMP,BIGINT,TIMESTAMP
+INT,TIMESTAMP,INT,TIMESTAMP
---- RESULTS
1,2011-03-13 01:00:00,1,2011-03-13 01:00:00
2,2011-03-13 01:30:00,2,2011-03-13 01:30:00
@@ -36,10 +36,10 @@ set timezone='America/Los_Angeles';
select straight_join * from timestamp_at_dst_changes t1
join (select '2011-11-06 01:00:00' as ts) t2 on t1.ts = t2.ts;
---- TYPES
-BIGINT,BIGINT,TIMESTAMP,TIMESTAMP
+INT,BIGINT,TIMESTAMP,STRING
---- RESULTS
-5,1320566400,2011-11-06 01:00:00,2011-11-06 01:00:00
-8,1320570000,2011-11-06 01:00:00,2011-11-06 01:00:00
+5,1320566400,2011-11-06 01:00:00,'2011-11-06 01:00:00'
+8,1320570000,2011-11-06 01:00:00,'2011-11-06 01:00:00'
---- RUNTIME_PROFILE
row_regex: .*RF00.\[min_max\] <- t2.ts.*
====
@@ -56,7 +56,7 @@ select straight_join t1.id, t1.ts, t2.ts from
timestamp_at_dst_changes t1
on t1.ts = from_utc_timestamp(t2.ts, 'Asia/Shanghai')
order by t1.id;
---- TYPES
-BIGINT,TIMESTAMP,TIMESTAMP
+INT,TIMESTAMP,TIMESTAMP
---- RESULTS
1,2011-03-13 17:00:00,2011-03-13 09:00:00
2,2011-03-13 17:30:00,2011-03-13 09:30:00
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/kudu_timestamp_conversion.test
b/testdata/workloads/functional-query/queries/QueryTest/kudu_timestamp_conversion.test
index c62b2a3a3..b453bfe8b 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/kudu_timestamp_conversion.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/kudu_timestamp_conversion.test
@@ -1,27 +1,175 @@
====
---- QUERY
-# Test that kutu UTC timestamp can be convert to local time.
-select id, from_unixtime(unixtime), ts from timestamp_at_dst_changes;
+# Test that TIMESTAMPs stored in Kudu are converted from UTC to local time when
+# convert_kudu_utc_timestamps=true.
+select id, from_unixtime(unixtime), ts from
functional_kudu.timestamp_at_dst_changes;
---- TYPES
-BIGINT,TIMESTAMP,TIMESTAMP
+INT,STRING,TIMESTAMP
---- RESULTS
-1,2011-03-13 09:00:00,2011-03-13 01:00:00
-2,2011-03-13 09:30:00,2011-03-13 01:30:00
-3,2011-03-13 10:00:00,2011-03-13 03:00:00
-4,2011-03-13 10:30:00,2011-03-13 03:30:00
-5,2011-11-06 08:00:00,2011-11-06 01:00:00
-6,2011-11-06 08:20:00,2011-11-06 01:20:00
-7,2011-11-06 08:40:00,2011-11-06 01:40:00
-8,2011-11-06 09:00:00,2011-11-06 01:00:00
-9,2011-11-06 09:20:00,2011-11-06 01:20:00
-10,2011-11-06 09:40:00,2011-11-06 01:40:00
+1,'2011-03-13 09:00:00',2011-03-13 01:00:00
+2,'2011-03-13 09:30:00',2011-03-13 01:30:00
+3,'2011-03-13 10:00:00',2011-03-13 03:00:00
+4,'2011-03-13 10:30:00',2011-03-13 03:30:00
+5,'2011-11-06 08:00:00',2011-11-06 01:00:00
+6,'2011-11-06 08:20:00',2011-11-06 01:20:00
+7,'2011-11-06 08:40:00',2011-11-06 01:40:00
+8,'2011-11-06 09:00:00',2011-11-06 01:00:00
+9,'2011-11-06 09:20:00',2011-11-06 01:20:00
+10,'2011-11-06 09:40:00',2011-11-06 01:40:00
====
---- QUERY
-# Test that kutu UTC timestamp conversion results are consistent with
from_utc_timestamp().
-select count(*) from functional.alltypes t1 join alltypes t2
+# Test that Kudu UTC timestamp conversion results are consistent with
from_utc_timestamp().
+select count(*) from functional.alltypes t1 join functional_kudu.alltypes t2
on from_utc_timestamp(t1.timestamp_col, 'America/Los_Angeles') =
t2.timestamp_col;
---- TYPES
BIGINT
---- RESULTS
7300
====
+---- QUERY
+# Shift timestamp key with 'id' seconds to have unique primary keys.
+create table utc_kudu (ts_pk_col timestamp primary key, ts_col timestamp, id
int)
+ partition by hash(ts_pk_col) partitions 2 stored as kudu;
+insert into utc_kudu select date_add(ts, interval id seconds), ts, id
+ from functional_kudu.timestamp_at_dst_changes;
+select * from utc_kudu where id = 1
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-03-13 01:00:01,2011-03-13 01:00:00,1
+====
+---- QUERY
+select count(*) from utc_kudu t1 join functional_kudu.timestamp_at_dst_changes
t2
+ on t1.id=t2.id and t1.ts_pk_col=date_add(t2.ts, interval t2.id seconds) and
t1.ts_col=t2.ts;
+---- TYPES
+BIGINT
+---- RESULTS
+10
+====
+---- QUERY
+# Insert values.
+insert into utc_kudu values
+ ("2011-03-13 01:50:00","2011-03-13 01:50:00",11),
+ ("2011-11-06 01:50:00","2011-11-06 01:50:00",12);
+select * from utc_kudu where id > 10;
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-03-13 01:50:00,2011-03-13 01:50:00,11
+2011-11-06 01:50:00,2011-11-06 01:50:00,12
+====
+---- QUERY
+# Upsert values.
+upsert into utc_kudu values
+ ("2011-03-13 01:50:00","2011-03-13 01:50:00",13),
+ ("2011-11-06 01:55:00","2011-11-06 01:55:00",14);
+select * from utc_kudu where id > 10;
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-11-06 01:50:00,2011-11-06 01:50:00,12
+2011-03-13 01:50:00,2011-03-13 01:50:00,13
+2011-11-06 01:55:00,2011-11-06 01:55:00,14
+====
+---- QUERY
+# Upsert from self, decrease ids by 1.
+upsert into utc_kudu
+ select ts_pk_col, ts_col, cast(id-1 as int) from utc_kudu
+ union all select "2011-03-13 01:55:00","2011-03-13 01:55:00",15;
+select * from utc_kudu where id > 10;
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-11-06 01:50:00,2011-11-06 01:50:00,11
+2011-03-13 01:50:00,2011-03-13 01:50:00,12
+2011-11-06 01:55:00,2011-11-06 01:55:00,13
+2011-03-13 01:55:00,2011-03-13 01:55:00,15
+====
+---- QUERY
+# Simple update on timestamp column.
+update utc_kudu set ts_col = "2011-03-13 01:55:01" where ts_pk_col="2011-03-13
01:55:00";
+select * from utc_kudu where ts_pk_col = "2011-03-13 01:55:00";
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-03-13 01:55:00,2011-03-13 01:55:01,15
+====
+---- QUERY
+# Simple update on non-timestamp column.
+update utc_kudu set id=16 where ts_pk_col="2011-03-13 01:55:00";
+select * from utc_kudu where ts_pk_col = "2011-03-13 01:55:00";
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-03-13 01:55:00,2011-03-13 01:55:01,16
+====
+---- QUERY
+# Update with non-const expression.
+update utc_kudu set ts_col=date_add(ts_col, interval 1 second) where id=16;
+select * from utc_kudu where ts_pk_col = "2011-03-13 01:55:00";
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-03-13 01:55:00,2011-03-13 01:55:02,16
+====
+---- QUERY
+# Update with non-const expression.
+update utc_kudu set ts_col=date_add(ts_col, interval 1 second);
+select * from utc_kudu where ts_pk_col = "2011-03-13 01:55:00";
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-03-13 01:55:00,2011-03-13 01:55:03,16
+====
+---- QUERY
+# Update with join.
+update t1 set t1.ts_col=date_add(t2.ts, interval t2.id second)
+ from utc_kudu t1 join functional_kudu.timestamp_at_dst_changes t2
+ on t1.id + 1 = t2.id;
+select * from utc_kudu where ts_pk_col = ts_col;
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-03-13 01:00:01,2011-03-13 01:00:01,0
+2011-03-13 01:30:02,2011-03-13 01:30:02,1
+2011-03-13 03:00:03,2011-03-13 03:00:03,2
+2011-03-13 03:30:04,2011-03-13 03:30:04,3
+2011-11-06 01:00:05,2011-11-06 01:00:05,4
+2011-11-06 01:20:06,2011-11-06 01:20:06,5
+2011-11-06 01:40:07,2011-11-06 01:40:07,6
+2011-11-06 01:00:08,2011-11-06 01:00:08,7
+2011-11-06 01:20:09,2011-11-06 01:20:09,8
+2011-11-06 01:40:10,2011-11-06 01:40:10,9
+====
+---- QUERY
+# Delete based on timestamp primary key column.
+delete from utc_kudu where ts_pk_col="2011-03-13 01:55:00";
+select * from utc_kudu where id > 10;
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-11-06 01:50:00,2011-11-06 01:50:01,11
+2011-03-13 01:50:00,2011-03-13 01:50:01,12
+2011-11-06 01:55:00,2011-11-06 01:55:01,13
+====
+---- QUERY
+# Delete based on timestamp non primary key column.
+delete from utc_kudu where ts_col="2011-11-06 01:55:01";
+select * from utc_kudu where id > 10;
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-11-06 01:50:00,2011-11-06 01:50:01,11
+2011-03-13 01:50:00,2011-03-13 01:50:01,12
+====
+---- QUERY
+# Delete rows inserted from timestamp_at_dst_changes.
+delete t1 from utc_kudu t1 join functional_kudu.timestamp_at_dst_changes t2
+ on t1.ts_pk_col = date_add(t2.ts, interval t2.id seconds);
+select * from utc_kudu;
+---- TYPES
+TIMESTAMP,TIMESTAMP,INT
+---- RESULTS
+2011-11-06 01:50:00,2011-11-06 01:50:01,11
+2011-03-13 01:50:00,2011-03-13 01:50:01,12
+====
diff --git a/tests/common/test_result_verifier.py
b/tests/common/test_result_verifier.py
index ab3680ce6..98db6a574 100644
--- a/tests/common/test_result_verifier.py
+++ b/tests/common/test_result_verifier.py
@@ -422,9 +422,9 @@ def verify_raw_results(test_section, exec_result,
file_format, result_section,
expected_types = [c.strip().upper()
for c in
remove_comments(section).rstrip('\n').split(',')]
- # Avro and Kudu represent TIMESTAMP columns as strings, so tests using
TIMESTAMP are
+ # Avro represents TIMESTAMP columns as strings, so tests using TIMESTAMP
are
# skipped because results will be wrong.
- if file_format in ('avro', 'kudu') and 'TIMESTAMP' in expected_types:
+ if file_format == 'avro' and 'TIMESTAMP' in expected_types:
LOG.info("TIMESTAMP columns unsupported in %s, skipping verification."
%\
file_format)
return
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 4869e0450..f3176ddc6 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -105,11 +105,15 @@ class TestKuduTimestampConvert(KuduTestSuite):
def add_test_dimensions(cls):
super(TestKuduTimestampConvert, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_mandatory_exec_option('convert_kudu_utc_timestamps',
'true')
+
cls.ImpalaTestMatrix.add_mandatory_exec_option('write_kudu_utc_timestamps',
'true')
+ cls.ImpalaTestMatrix.add_mandatory_exec_option(
+ 'use_local_tz_for_unix_timestamp_conversions', 'false')
cls.ImpalaTestMatrix.add_mandatory_exec_option('timezone',
'"America/Los_Angeles"')
@SkipIfKudu.no_hybrid_clock()
- def test_kudu_timestamp_conversion(self, vector):
- self.run_test_case('QueryTest/kudu_timestamp_conversion', vector)
+ def test_kudu_timestamp_conversion(self, vector, unique_database):
+ self.run_test_case(
+ 'QueryTest/kudu_timestamp_conversion', vector, use_db=unique_database)
@SkipIfKudu.no_hybrid_clock()
def test_kudu_predicate_with_timestamp_conversion(self, vector):