This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b6395b8d IMPALA-13627: Handle legacy Hive timezone conversion
1b6395b8d is described below

commit 1b6395b8db09d271bd166bf501bdf7038d8be644
Author: Michael Smith <[email protected]>
AuthorDate: Thu Dec 19 14:14:03 2024 -0800

    IMPALA-13627: Handle legacy Hive timezone conversion
    
    After HIVE-12191, Hive has 2 different methods of calculating timestamp
    conversion from UTC to local timezone. When Impala has
    convert_legacy_hive_parquet_utc_timestamps=true, it assumes times
    written by Hive are in UTC and converts them to local time using tzdata,
    which matches the newer method introduced by HIVE-12191.
    
    Some dates convert differently between the two methods, such as
    Asia/Kuala_Lumpur or Singapore prior to 1982 (also seen in HIVE-24074).
    After HIVE-25104, Hive writes 'writer.zone.conversion.legacy' to
    distinguish which method is being used. As a result there are three
    different cases we have to handle:
    1. Hive prior to 3.1 used what’s now called “legacy conversion” using
       SimpleDateFormat.
    2. Hive 3.1.2 (with HIVE-21290) used a new Java API that’s based on
       tzdata and added metadata to identify the timezone.
    3. Hive 4 support both, and added a new file metadata to identify it.
    
    Adds handling for Hive files (identified by created_by=parquet-mr) where
    we can infer the correct handling from Parquet file metadata:
    1. if writer.zone.conversion.legacy is present (Hive 4), use it to
       determine whether to use a legacy conversion method compatible with
       Hive's legacy behavior, or convert using tzdata.
    2. if writer.zone.conversion.legacy is not present but writer.time.zone
       is, we can infer it was written by Hive 3.1.2+ using new APIs.
    3. otherwise it was likely written by an earlier Hive version.
    
    Adds a new CLI and query option - use_legacy_hive_timestamp_conversion -
    to select what conversion method to use in the 3rd case above, when
    Impala determines that the file was written by Hive older than 3.1.2.
    Defaults to false to minimize changes in Impala's behavior and because
    going through JNI is ~50x slower even when the results would not differ;
    Hive defaults to true for its equivalent setting:
    hive.parquet.timestamp.legacy.conversion.enabled.
    
    Hive legacy-compatible conversion uses a Java method that would be
    complicated to mimic in C++, doing
    
      DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      formatter.setTimeZone(TimeZone.getTimeZone(timezone_string));
      java.util.Date date = formatter.parse(date_time_string);
      formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
      return out.println(formatter.format(date);
    
    IMPALA-9385 added a check against a Timezone pointer in
    FromUnixTimestamp. That dominates the time in FromUnixTimeNanos,
    overriding any benchmark gains from IMPALA-7417. Moves FromUnixTime to
    allow inlining, and switches to using UTCPTR in the benchmark - as
    IMPALA-9385 did in most other code - to restore benchmark results.
    
    Testing:
    - Adds JVM conversion method to convert-timestamp-benchmark.
    - Adds tests for several cases from Hive conversion tests.
    
    Change-Id: I1271ed1da0b74366ab8315e7ec2d4ee47111e067
    Reviewed-on: http://gerrit.cloudera.org:8080/22293
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Csaba Ringhofer <[email protected]>
---
 be/src/benchmarks/convert-timestamp-benchmark.cc   | 144 ++++++++++++---------
 be/src/benchmarks/date-benchmark.cc                |   2 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  74 ++++++++---
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  27 ++--
 be/src/exec/parquet/parquet-common.cc              |  23 +++-
 be/src/exec/parquet/parquet-common.h               |  13 +-
 be/src/runtime/raw-value-test.cc                   |   2 +
 be/src/runtime/timestamp-value.cc                  |  39 ++++--
 be/src/runtime/timestamp-value.h                   |   5 +
 be/src/runtime/timestamp-value.inline.h            |   9 ++
 be/src/service/frontend.cc                         |  12 +-
 be/src/service/frontend.h                          |   5 +
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |   4 +-
 common/thrift/Frontend.thrift                      |  14 ++
 common/thrift/ImpalaService.thrift                 |   9 ++
 common/thrift/Query.thrift                         |   3 +
 docs/topics/impala_timestamp.xml                   |  37 ++++++
 .../org/apache/impala/service/JniFrontend.java     |  27 ++++
 .../data/employee_hive_3_1_3_us_pacific.parquet    | Bin 0 -> 446 bytes
 testdata/data/hive_kuala_lumpur_legacy.parquet     | Bin 0 -> 507 bytes
 testdata/data/tbl_parq1/000000_0                   | Bin 0 -> 286 bytes
 testdata/data/tbl_parq1/000000_1                   | Bin 0 -> 286 bytes
 testdata/data/tbl_parq1/000000_2                   | Bin 0 -> 327 bytes
 .../QueryTest/timestamp-conversion-hive-313.test   |  46 +++++++
 .../QueryTest/timestamp-conversion-hive-3m.test    |  34 +++++
 .../QueryTest/timestamp-conversion-hive-4.test     |  28 ++++
 tests/query_test/test_hive_timestamp_conversion.py |  76 +++++++++++
 28 files changed, 530 insertions(+), 107 deletions(-)

diff --git a/be/src/benchmarks/convert-timestamp-benchmark.cc 
b/be/src/benchmarks/convert-timestamp-benchmark.cc
index a39829ab7..18b14fd47 100644
--- a/be/src/benchmarks/convert-timestamp-benchmark.cc
+++ b/be/src/benchmarks/convert-timestamp-benchmark.cc
@@ -37,8 +37,11 @@
 #include "exprs/timezone_db.h"
 #include "exprs/timestamp-functions.h"
 #include "runtime/datetime-simple-date-format-parser.h"
+#include "runtime/test-env.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
+#include "service/fe-support.h"
+#include "service/frontend.h"
 #include "util/benchmark.h"
 #include "util/cpu-info.h"
 #include "util/pretty-printer.h"
@@ -49,118 +52,120 @@
 using std::random_device;
 using std::mt19937;
 using std::uniform_int_distribution;
-using std::thread;
 
 using namespace impala;
 using namespace datetime_parse_util;
 
 // Benchmark tests for timestamp time-zone conversions
 /*
-Machine Info: Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
+Machine Info: 12th Gen Intel(R) Core(TM) i9-12900K
 
 UtcToUnixTime:             Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                            (glibc)               7.98     8.14      8.3       
  1X         1X         1X
-                      (Google/CCTZ)               17.9     18.2     18.5      
2.24X      2.24X      2.23X
-                            (boost)                301      306      311      
37.7X      37.5X      37.5X
+                            (glibc)                 12     12.1     12.2       
  1X         1X         1X
+                      (Google/CCTZ)               25.2     25.5     25.5       
2.1X      2.11X      2.09X
+                            (boost)                635      643      646       
 53X      53.3X      52.9X
 
 LocalToUnixTime:           Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                            (glibc)              0.717    0.732    0.745       
  1X         1X         1X
-                      (Google/CCTZ)               15.3     15.5     15.8      
21.3X      21.2X      21.2X
+                            (glibc)              0.739    0.745    0.745       
  1X         1X         1X
+                      (Google/CCTZ)               23.2     23.4     23.7      
31.4X      31.4X      31.7X
 
 FromUtc:                   Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                            (boost)                1.6     1.63     1.67       
  1X         1X         1X
-                      (Google/CCTZ)               14.5     14.8     15.2      
9.06X      9.09X      9.11X
+                            (boost)               2.59      2.6      2.6       
  1X         1X         1X
+                      (Google/CCTZ)               24.4     24.7     24.9      
9.45X       9.5X      9.58X
 
 ToUtc:                     Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                            (boost)               1.63     1.67     1.68       
  1X         1X         1X
-                      (Google/CCTZ)                8.7      8.9     9.05      
5.34X      5.34X      5.38X
+                            (boost)                2.6     2.64     2.65       
  1X         1X         1X
+                      (Google/CCTZ)               13.5     13.6     13.7       
5.2X      5.16X      5.18X
 
 UtcToLocal:                Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                            (glibc)               2.68     2.75      2.8       
  1X         1X         1X
-                      (Google/CCTZ)                 15     15.2     15.5      
5.59X      5.55X      5.53X
+                            (glibc)               4.42     4.42     4.44       
  1X         1X         1X
+                      (Google/CCTZ)               25.6     26.6       27      
5.78X      6.01X      6.08X
+                              (JVM)              0.511    0.596      0.6     
0.115X     0.135X     0.135X
 
 UnixTimeToLocalPtime:      Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                            (glibc)               2.69     2.75      2.8       
  1X         1X         1X
-                      (Google/CCTZ)               14.8     15.1     15.4       
5.5X      5.49X      5.52X
+                            (glibc)               4.51     4.53     4.53       
  1X         1X         1X
+                      (Google/CCTZ)               23.7     24.1     24.4      
5.25X      5.32X       5.4X
 
 UnixTimeToUtcPtime:        Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                            (glibc)                 17     17.6     17.9       
  1X         1X         1X
-                      (Google/CCTZ)               6.45     6.71     6.81     
0.379X     0.382X      0.38X
-                        (fast path)               25.1       26     26.4      
1.47X      1.48X      1.48X
-                        (day split)               48.6     50.3     51.3      
2.85X      2.87X      2.86X
+                            (glibc)               24.8     25.1     25.6       
  1X         1X         1X
+                      (Google/CCTZ)               13.9       14     14.1     
0.562X     0.557X     0.553X
+                        (fast path)               54.3     54.8     55.4      
2.19X      2.18X      2.17X
+                        (day split)                196      199      200      
7.89X      7.92X      7.81X
 
 UtcFromUnixTimeMicros:     Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                  (sec split (old))               17.9     18.7     19.1       
  1X         1X         1X
-                        (day split)                111      116      118      
6.21X      6.19X      6.19X
+                  (sec split (old))               30.1       31     31.7       
  1X         1X         1X
+                        (day split)                526      532      534      
17.5X      17.2X      16.9X
 
 FromUnixTimeNanos:         Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                  (sec split (old))               18.7     19.5     19.8       
  1X         1X         1X
-                  (sec split (new))                104      108      110      
5.58X      5.55X      5.57X
+                  (sec split (old))               36.8     37.5       39       
  1X         1X         1X
+                  (sec split (new))                319      323      324      
8.68X      8.61X      8.33X
 
 FromSubsecondUnixTime:     Function  iters/ms   10%ile   50%ile   90%ile     
10%ile     50%ile     90%ile
                                                                          
(relative) (relative) (relative)
 
---------------------------------------------------------------------------------------------------------
-                              (old)               18.7     18.7     18.7       
  1X         1X         1X
-                              (new)               73.5     74.1     74.1      
3.94X      3.96X      3.96X
+                              (old)               37.1       38     38.7       
  1X         1X         1X
+                              (new)                175      175      177      
4.71X       4.6X      4.59X
 
 Number of threads: 8
 
 UtcToUnixTime:
-             (glibc) elapsed time: 1s020ms
-       (Google/CCTZ) elapsed time: 144ms
-             (boost) elapsed time: 10ms
-cctz speedup: 7.0784
-boost speedup: 95.1732
+             (glibc) elapsed time: 793ms
+       (Google/CCTZ) elapsed time: 152ms
+             (boost) elapsed time: 3ms
+cctz speedup: 5.20977
+boost speedup: 260.517
 
 LocalToUnixTime:
-             (glibc) elapsed time: 18s050ms
-       (Google/CCTZ) elapsed time: 212ms
-speedup: 84.9949
+             (glibc) elapsed time: 21s750ms
+       (Google/CCTZ) elapsed time: 242ms
+speedup: 89.6388
 
 FromUtc:
-             (boost) elapsed time: 1s519ms
-       (Google/CCTZ) elapsed time: 263ms
-speedup: 5.77003
+             (boost) elapsed time: 1s298ms
+       (Google/CCTZ) elapsed time: 279ms
+speedup: 4.64642
 
 ToUtc:
-             (boost) elapsed time: 1s674ms
-       (Google/CCTZ) elapsed time: 325ms
-speedup: 5.13874
+             (boost) elapsed time: 1s436ms
+       (Google/CCTZ) elapsed time: 496ms
+speedup: 2.89263
 
 UtcToLocal:
-             (glibc) elapsed time: 4s862ms
-       (Google/CCTZ) elapsed time: 263ms
-speedup: 18.4253
+             (glibc) elapsed time: 4s565ms
+       (Google/CCTZ) elapsed time: 260ms
+               (JVM) elapsed time: 2s507ms
+cctz speedup: 17.5058
+jvm speedup: 1.82038
 
 UnixTimeToLocalPtime:
-             (glibc) elapsed time: 4s856ms
-       (Google/CCTZ) elapsed time: 259ms
-speedup: 18.7398
+             (glibc) elapsed time: 4s576ms
+       (Google/CCTZ) elapsed time: 268ms
+speedup: 17.0547
 
 UnixTimeToUtcPtime:
-             (glibc) elapsed time: 928ms
-       (Google/CCTZ) elapsed time: 282ms
-         (fast path) elapsed time: 90ms
-cctz speedup: 3.28187
-fast path speedup: 10.2951
+             (glibc) elapsed time: 478ms
+       (Google/CCTZ) elapsed time: 123ms
+         (fast path) elapsed time: 25ms
+cctz speedup: 3.87304
+fast path speedup: 18.9835
 */
 
 vector<TimestampValue> AddTestDataDateTimes(int n, const string& startstr) {
@@ -223,11 +228,11 @@ public:
     }
 
     // Create and start threads.
-    vector<unique_ptr<thread>> threads(num_of_threads);
+    vector<unique_ptr<std::thread>> threads(num_of_threads);
     StopWatch sw;
     sw.Start();
     for (int i = 0; i < num_of_threads; ++i) {
-      threads[i] = make_unique<thread>(
+      threads[i] = make_unique<std::thread>(
           run_test, batch_size, test_data[i].get());
     }
 
@@ -403,6 +408,13 @@ TimestampValue cctz_utc_to_local(const TimestampValue& 
ts_value) {
   return TimestampValue(d, t);
 }
 
+// JVM
+TimestampValue jvm_utc_to_local(const TimestampValue& ts_value) {
+  TimestampValue result = ts_value;
+  result.HiveLegacyUtcToLocal(*PTR_CCTZ_LOCAL_TZ);
+  return result;
+}
+
 
 //
 // Test FromUtc (CCTZ is expected to be faster than boost)
@@ -603,8 +615,7 @@ TimestampValue old_split_utc_from_unix_time_nanos(const 
SplitNanoAndSecond& unix
 TimestampValue new_split_utc_from_unix_time_nanos(const SplitNanoAndSecond& 
unix_time) {
   // The TimestampValue version is used as it is hard to reproduce the same 
logic without
   // accessing private members.
-  return TimestampValue::FromUnixTimeNanos(unix_time.seconds, unix_time.nanos,
-      &TimezoneDatabase::GetUtcTimezone());
+  return TimestampValue::FromUnixTimeNanos(unix_time.seconds, unix_time.nanos, 
UTCPTR);
 }
 
 TimestampValue from_subsecond_unix_time_old(const double& unix_time) {
@@ -621,8 +632,7 @@ TimestampValue from_subsecond_unix_time_new(const double& 
unix_time) {
   const double ONE_BILLIONTH = 0.000000001;
   int64_t unix_time_whole = unix_time;
   int64_t nanos = (unix_time - unix_time_whole) / ONE_BILLIONTH;
-  return TimestampValue::FromUnixTimeNanos(
-      unix_time_whole, nanos, &TimezoneDatabase::GetUtcTimezone());
+  return TimestampValue::FromUnixTimeNanos(unix_time_whole, nanos, UTCPTR);
 }
 
 //
@@ -698,7 +708,11 @@ TimestampVal cctz_to_utc(const TimestampVal& ts_val) {
 
 
 int main(int argc, char* argv[]) {
-  CpuInfo::Init();
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  TestEnv test_env;
+  CHECK(test_env.Init().ok());
+
   cout << Benchmark::GetMachineInfo() << endl;
 
   ABORT_IF_ERROR(TimezoneDatabase::Initialize());
@@ -783,9 +797,12 @@ int main(int argc, char* argv[]) {
       tsvalue_data;
   TestData<TimestampValue, TimestampValue, cctz_utc_to_local> 
cctz_utc_to_local_data =
       tsvalue_data;
+  TestData<TimestampValue, TimestampValue, jvm_utc_to_local> 
jvm_utc_to_local_data =
+      tsvalue_data;
 
   glibc_utc_to_local_data.add_to_benchmark(bm_utc_to_local, "(glibc)");
   cctz_utc_to_local_data.add_to_benchmark(bm_utc_to_local, "(Google/CCTZ)");
+  jvm_utc_to_local_data.add_to_benchmark(bm_utc_to_local, "(JVM)");
   cout << bm_utc_to_local.Measure() << endl;
 
   bail_if_results_dont_match(vector<const vector<TimestampValue>*>{
@@ -795,7 +812,7 @@ int main(int argc, char* argv[]) {
   vector<time_t> time_data;
   for (const TimestampValue& tsvalue: tsvalue_data) {
     time_t unix_time;
-    tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
+    tsvalue.ToUnixTime(UTCPTR, &unix_time);
     time_data.push_back(unix_time);
   }
 
@@ -859,7 +876,7 @@ int main(int argc, char* argv[]) {
   for (int i = 0; i < tsvalue_data.size(); ++i) {
     const TimestampValue& tsvalue = tsvalue_data[i];
     time_t unix_time;
-    tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
+    tsvalue.ToUnixTime(UTCPTR, &unix_time);
     int micros = (i * 1001) % MICROS_PER_SEC; // add some sub-second part
     microsec_data.push_back(unix_time * MICROS_PER_SEC + micros);
   }
@@ -885,7 +902,7 @@ int main(int argc, char* argv[]) {
   for (int i = 0; i < tsvalue_data.size(); ++i) {
     const TimestampValue& tsvalue = tsvalue_data[i];
     time_t unix_time;
-    tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
+    tsvalue.ToUnixTime(UTCPTR, &unix_time);
     int nanos = (i * 1001) % NANOS_PER_SEC; // add some sub-second part
     nanosec_data.push_back(SplitNanoAndSecond {unix_time, nanos} );
   }
@@ -911,7 +928,7 @@ int main(int argc, char* argv[]) {
   for (int i = 0; i < tsvalue_data.size(); ++i) {
     const TimestampValue& tsvalue = tsvalue_data[i];
     time_t unix_time;
-    tsvalue.ToUnixTime(&TimezoneDatabase::GetUtcTimezone(), &unix_time);
+    tsvalue.ToUnixTime(UTCPTR, &unix_time);
     double nanos = (i * 1001) % NANOS_PER_SEC; // add some sub-second part
     double_data.push_back((double)unix_time + nanos / NANOS_PER_SEC);
   }
@@ -979,7 +996,10 @@ int main(int argc, char* argv[]) {
         num_of_threads, BATCH_SIZE, tsvalue_data, "(glibc)");
     m2 = cctz_utc_to_local_data.measure_multithreaded_elapsed_time(
         num_of_threads, BATCH_SIZE, tsvalue_data, "(Google/CCTZ)");
-    cout << "speedup: " << double(m1)/double(m2) << endl;
+    m3 = jvm_utc_to_local_data.measure_multithreaded_elapsed_time(
+        num_of_threads, BATCH_SIZE, tsvalue_data, "(JVM)");
+    cout << "cctz speedup: " << double(m1)/double(m2) << endl;
+    cout << "jvm speedup: " << double(m1)/double(m3) << endl;
 
     // UnixTimeToLocalPtime
     cout << endl << "UnixTimeToLocalPtime:" << endl;
diff --git a/be/src/benchmarks/date-benchmark.cc 
b/be/src/benchmarks/date-benchmark.cc
index 84ccd6324..cca4b0bb0 100644
--- a/be/src/benchmarks/date-benchmark.cc
+++ b/be/src/benchmarks/date-benchmark.cc
@@ -26,6 +26,8 @@
 #include "runtime/datetime-simple-date-format-parser.h"
 #include "runtime/date-parse-util.h"
 #include "runtime/timestamp-parse-util.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/timestamp-value.inline.h"
 #include "util/benchmark.h"
 #include "util/cpu-info.h"
 
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc 
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 9e33ff360..14aec5520 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -21,6 +21,8 @@
 #include <queue>
 #include <stack>
 
+#include <boost/algorithm/string.hpp>
+#include <boost/range/adaptor/transformed.hpp>
 #include <gflags/gflags.h>
 #include <gutil/strings/substitute.h>
 
@@ -53,6 +55,9 @@
 
 #include "common/names.h"
 
+using boost::adaptors::transformed;
+using boost::algorithm::iequals;
+using boost::algorithm::join;
 using std::move;
 using std::sort;
 using namespace impala;
@@ -564,14 +569,13 @@ Status 
HdfsParquetScanner::ResolveSchemaForStatFiltering(SlotDescriptor* slot_de
 }
 
 ColumnStatsReader HdfsParquetScanner::CreateStatsReader(
-    const parquet::FileMetaData& file_metadata, const parquet::RowGroup& 
row_group,
-    SchemaNode* node, const ColumnType& col_type) {
+    const parquet::RowGroup& row_group, SchemaNode* node, const ColumnType& 
col_type) {
   DCHECK(node);
 
   int col_idx = node->col_idx;
   DCHECK_LT(col_idx, row_group.columns.size());
 
-  const vector<parquet::ColumnOrder>& col_orders = file_metadata.column_orders;
+  const vector<parquet::ColumnOrder>& col_orders = 
file_metadata_.column_orders;
   const parquet::ColumnOrder* col_order =
       col_idx < col_orders.size() ? &col_orders[col_idx] : nullptr;
 
@@ -585,8 +589,7 @@ ColumnStatsReader HdfsParquetScanner::CreateStatsReader(
   return stat_reader;
 }
 
-Status HdfsParquetScanner::EvaluateStatsConjuncts(
-    const parquet::FileMetaData& file_metadata, const parquet::RowGroup& 
row_group,
+Status HdfsParquetScanner::EvaluateStatsConjuncts(const parquet::RowGroup& 
row_group,
     bool* skip_row_group) {
   *skip_row_group = false;
 
@@ -623,7 +626,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
     }
 
     ColumnStatsReader stats_reader =
-        CreateStatsReader(file_metadata, row_group, node, slot_desc->type());
+        CreateStatsReader(row_group, node, slot_desc->type());
 
     bool all_nulls = false;
     if (stats_reader.AllNulls(&all_nulls) && all_nulls) {
@@ -686,8 +689,7 @@ bool 
HdfsParquetScanner::FilterAlreadyDisabledOrOverlapWithColumnStats(
 }
 
 Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
-    const parquet::FileMetaData& file_metadata, const parquet::RowGroup& 
row_group,
-    bool* skip_row_group) {
+    const parquet::RowGroup& row_group, bool* skip_row_group) {
   *skip_row_group = false;
 
   if (!state_->query_options().parquet_read_statistics) return Status::OK();
@@ -764,7 +766,7 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
       break;
     }
     ColumnStatsReader stats_reader =
-        CreateStatsReader(file_metadata, row_group, node, slot_desc->type());
+        CreateStatsReader(row_group, node, slot_desc->type());
 
     bool all_nulls = false;
     if (stats_reader.AllNulls(&all_nulls) && all_nulls) {
@@ -926,8 +928,7 @@ Status HdfsParquetScanner::NextRowGroup() {
 
     // Evaluate row group statistics with stats conjuncts.
     bool skip_row_group_on_stats;
-    RETURN_IF_ERROR(
-        EvaluateStatsConjuncts(file_metadata_, row_group, 
&skip_row_group_on_stats));
+    RETURN_IF_ERROR(EvaluateStatsConjuncts(row_group, 
&skip_row_group_on_stats));
     if (skip_row_group_on_stats) {
       COUNTER_ADD(num_stats_filtered_row_groups_counter_, 1);
       continue;
@@ -935,8 +936,7 @@ Status HdfsParquetScanner::NextRowGroup() {
 
     // Evaluate row group statistics with min/max filters.
     bool skip_row_group_on_minmax;
-    RETURN_IF_ERROR(
-      EvaluateOverlapForRowGroup(file_metadata_, row_group, 
&skip_row_group_on_minmax));
+    RETURN_IF_ERROR(EvaluateOverlapForRowGroup(row_group, 
&skip_row_group_on_minmax));
     if (skip_row_group_on_minmax) {
       COUNTER_ADD(num_minmax_filtered_row_groups_counter_, 1);
       continue;
@@ -1487,7 +1487,7 @@ Status 
HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
     }
 
     ColumnStatsReader stats_reader =
-        CreateStatsReader(file_metadata_, row_group, node, slot_desc->type());
+        CreateStatsReader(row_group, node, slot_desc->type());
 
     DCHECK_LT(col_idx, row_group.columns.size());
     const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
@@ -1570,7 +1570,7 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
     }
     int col_idx = node->col_idx;;
     ColumnStatsReader stats_reader =
-        CreateStatsReader(file_metadata_, row_group, node, slot_desc->type());
+        CreateStatsReader(row_group, node, slot_desc->type());
 
     DCHECK_LT(col_idx, row_group.columns.size());
     const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
@@ -2826,6 +2826,13 @@ Status HdfsParquetScanner::ProcessFooter() {
 
   RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_, 
filename()));
 
+  if (VLOG_FILE_IS_ON) {
+    VLOG_FILE << "Parquet metadata for " << filename() << " created by "
+              << file_metadata_.created_by << ":\n"
+              << join(file_metadata_.key_value_metadata | transformed(
+                  [](parquet::KeyValue kv) { return kv.key + "=" + kv.value; 
}), "\n");
+  }
+
   // IMPALA-3943: Do not throw an error for empty files for backwards 
compatibility.
   if (file_metadata_.num_rows == 0) {
     // Warn if the num_rows is inconsistent with the row group metadata.
@@ -3157,8 +3164,41 @@ ParquetTimestampDecoder 
HdfsParquetScanner::CreateTimestampDecoder(
       state_->query_options().convert_legacy_hive_parquet_utc_timestamps &&
       state_->local_time_zone() != UTCPTR;
 
-  return ParquetTimestampDecoder(element, state_->local_time_zone(),
-      timestamp_conversion_needed_for_int96_timestamps);
+  const Timezone* timezone = state_->local_time_zone();
+  bool hive_legacy_conversion = false;
+  if (timestamp_conversion_needed_for_int96_timestamps && 
GetHiveZoneConversionLegacy()) {
+    VLOG_FILE << "Using Hive legacy timezone conversion";
+    hive_legacy_conversion = true;
+  }
+
+  return ParquetTimestampDecoder(element, timezone,
+      timestamp_conversion_needed_for_int96_timestamps, 
hive_legacy_conversion);
+}
+
+bool HdfsParquetScanner::GetHiveZoneConversionLegacy() const {
+  string writer_zone_conversion_legacy;
+  string writer_time_zone;
+  for (const parquet::KeyValue& kv : file_metadata_.key_value_metadata) {
+    if (kv.key == "writer.zone.conversion.legacy") {
+      writer_zone_conversion_legacy = kv.value;
+    } else if (kv.key == "writer.time.zone") {
+      writer_time_zone = kv.value;
+    }
+  }
+
+  if (writer_zone_conversion_legacy != "") {
+    return iequals(writer_zone_conversion_legacy, "true");
+  }
+
+  // There are no explicit meta about the legacy conversion.
+  if (writer_time_zone != "") {
+    // There is meta about the timezone thus we can infer that when the file 
was written,
+    // the new APIs were used.
+    return false;
+  }
+
+  // There is no (relevant) metadata in the file, use the configuration.
+  return state_->query_options().use_legacy_hive_timestamp_conversion;
 }
 
 void HdfsParquetScanner::UpdateCompressedPageSizeCounter(int64_t 
compressed_page_size) {
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h 
b/be/src/exec/parquet/hdfs-parquet-scanner.h
index f3c8b51e1..42cfd7d2e 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -587,10 +587,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   }
 
   /// Evaluates the min/max predicates of the 'scan_node_' using the 
parquet::Statistics
-  /// of 'row_group'. 'file_metadata' is used to determine the ordering that 
was used to
-  /// compute the statistics. Sets 'skip_row_group' to true if the row group 
can be
-  /// skipped, 'false' otherwise.
-  Status EvaluateStatsConjuncts(const parquet::FileMetaData& file_metadata,
+  /// of 'row_group'. Sets 'skip_row_group' to true if the row group can be 
skipped,
+  /// 'false' otherwise.
+  Status EvaluateStatsConjuncts(
       const parquet::RowGroup& row_group, bool* skip_row_group) 
WARN_UNUSED_RESULT;
 
   /// Advances 'row_group_idx_' to the next non-empty row group and initializes
@@ -601,12 +600,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   Status NextRowGroup() WARN_UNUSED_RESULT;
 
   /// Evaluates the overlap predicates of the 'scan_node_' using the 
parquet::Statistics
-  /// of 'row_group'. 'file_metadata' is used to determine the ordering that 
was used to
-  /// compute the statistics. Sets 'skip_row_group' to true if the row group 
can be
-  /// skipped, 'false' otherwise.
+  /// of 'row_group'. Sets 'skip_row_group' to true if the row group can be 
skipped,
+  /// 'false' otherwise.
   Status EvaluateOverlapForRowGroup(
-    const parquet::FileMetaData& file_metadata, const parquet::RowGroup& 
row_group,
-    bool* skip_row_group);
+      const parquet::RowGroup& row_group, bool* skip_row_group);
 
   /// Return true if filter 'minmax_filter' of fitler id 'filter_id' is too 
close to
   /// column min/max stats available at the target desc entry targets[0] in
@@ -631,12 +628,12 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
       SchemaNode** schema_node_ptr = nullptr);
 
   /// Create a ColumnStatsReader object for a column chunk described by a 
schema
-  /// path in a slot descriptor 'slot_desc'. 'file_metadata', 'row_group', 
'node',
-  /// and 'col_type' provide extra data needed.
+  /// path in a slot descriptor 'slot_desc'. 'row_group', 'node', and 
'col_type' provide
+  /// extra data needed.
   /// On return:
   ///   A column chunk stats reader ('ColumnStatsReader') is returned.
-  ColumnStatsReader CreateStatsReader(const parquet::FileMetaData& 
file_metadata,
-      const parquet::RowGroup& row_group, SchemaNode* node, const ColumnType& 
col_type);
+  ColumnStatsReader CreateStatsReader(const parquet::RowGroup& row_group,
+      SchemaNode* node, const ColumnType& col_type);
 
   /// Return the overlap predicate descs from the HDFS scan plan.
   const vector<TOverlapPredicateDesc>& GetOverlapPredicateDescs();
@@ -944,6 +941,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// then we skip to row index 'skip_to_row'.
   Status SkipRowsForColumns(const vector<ParquetColumnReader*>& column_readers,
       int64_t* num_rows_to_skip, int64_t* skip_to_row);
+
+  /// Returns whether Hive legacy zone conversion should be used for 
transforming
+  /// timestamps based on file metadata and configuration.
+  bool GetHiveZoneConversionLegacy() const;
 };
 
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-common.cc 
b/be/src/exec/parquet/parquet-common.cc
index 34133ea4d..ee284e4b3 100644
--- a/be/src/exec/parquet/parquet-common.cc
+++ b/be/src/exec/parquet/parquet-common.cc
@@ -255,7 +255,8 @@ bool 
ParquetTimestampDecoder::GetTimestampInfoFromSchema(const parquet::SchemaEl
 }
 
 ParquetTimestampDecoder::ParquetTimestampDecoder(const parquet::SchemaElement& 
e,
-    const Timezone* timezone, bool convert_int96_timestamps) {
+    const Timezone* timezone, bool convert_int96_timestamps,
+    bool hive_legacy_conversion) : 
hive_legacy_conversion_(hive_legacy_conversion) {
   bool needs_conversion = false;
   bool valid_schema = GetTimestampInfoFromSchema(e, precision_, 
needs_conversion);
   DCHECK(valid_schema); // Invalid schemas should be rejected in an earlier 
step.
@@ -267,7 +268,15 @@ void 
ParquetTimestampDecoder::ConvertMinStatToLocalTime(TimestampValue* v) const
   DCHECK(timezone_ != nullptr);
   if (!v->HasDateAndTime()) return;
   TimestampValue repeated_period_start;
-  v->UtcToLocal(*timezone_, &repeated_period_start);
+  if (hive_legacy_conversion_) {
+    // Hive legacy conversion does not have efficient tools for identifying 
repeated
+    // periods, so subtract a day to ensure we cover all possible repeated 
periods
+    // (such as switching from UTC- to UTC+ near the international date line).
+    v->HiveLegacyUtcToLocal(*timezone_);
+    v->Subtract(boost::posix_time::hours(24));
+  } else {
+    v->UtcToLocal(*timezone_, &repeated_period_start);
+  }
   if (repeated_period_start.HasDateAndTime()) *v = repeated_period_start;
 }
 
@@ -275,7 +284,15 @@ void 
ParquetTimestampDecoder::ConvertMaxStatToLocalTime(TimestampValue* v) const
   DCHECK(timezone_ != nullptr);
   if (!v->HasDateAndTime()) return;
   TimestampValue repeated_period_end;
-  v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
+  if (hive_legacy_conversion_) {
+    // Hive legacy conversion does not have efficient tools for identifying 
repeated
+    // periods, so add a day to ensure we cover all possible repeated periods
+    // (such as switching from UTC- to UTC+ near the international date line).
+    v->HiveLegacyUtcToLocal(*timezone_);
+    v->Add(boost::posix_time::hours(24));
+  } else {
+    v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
+  }
   if (repeated_period_end.HasDateAndTime()) *v = repeated_period_end;
 }
 }
diff --git a/be/src/exec/parquet/parquet-common.h 
b/be/src/exec/parquet/parquet-common.h
index 2b215e150..72ad18015 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -764,7 +764,7 @@ public:
   ParquetTimestampDecoder() {}
 
   ParquetTimestampDecoder( const parquet::SchemaElement& e, const Timezone* 
timezone,
-      bool convert_int96_timestamps);
+      bool convert_int96_timestamps, bool hive_legacy_conversion);
 
   bool NeedsConversion() const { return timezone_ != nullptr; }
 
@@ -798,7 +798,13 @@ public:
 
   void ConvertToLocalTime(TimestampValue* v) const {
     DCHECK(timezone_ != nullptr);
-    if (v->HasDateAndTime()) v->UtcToLocal(*timezone_);
+    if (v->HasDateAndTime()) {
+      if (hive_legacy_conversion_) {
+        v->HiveLegacyUtcToLocal(*timezone_);
+      } else {
+        v->UtcToLocal(*timezone_);
+      }
+    }
   }
 
   /// Timezone conversion of min/max stats need some extra logic because 
UTC->local
@@ -831,6 +837,9 @@ private:
   /// INT64 decoding. INT64 with nanosecond precision (and reduced range) is 
also planned
   /// to be implemented once it is added in Parquet (PARQUET-1387).
   Precision precision_ = NANO;
+
+  /// Use Hive legacy-compatible conversion with Java DateFormat.
+  bool hive_legacy_conversion_ = false;
 };
 
 template <>
diff --git a/be/src/runtime/raw-value-test.cc b/be/src/runtime/raw-value-test.cc
index 090178f1e..066adca69 100644
--- a/be/src/runtime/raw-value-test.cc
+++ b/be/src/runtime/raw-value-test.cc
@@ -19,6 +19,8 @@
 
 #include "exprs/timezone_db.h"
 #include "runtime/raw-value.inline.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/timestamp-value.inline.h"
 #include "testutil/gtest-util.h"
 
 #include "common/names.h"
diff --git a/be/src/runtime/timestamp-value.cc 
b/be/src/runtime/timestamp-value.cc
index 417ef467d..9429704b8 100644
--- a/be/src/runtime/timestamp-value.cc
+++ b/be/src/runtime/timestamp-value.cc
@@ -17,14 +17,19 @@
 
 #include "runtime/timestamp-value.h"
 
+#include <boost/algorithm/string/predicate.hpp>
+
 #include "exprs/timestamp-functions.h"
 #include "exprs/timezone_db.h"
 #include "runtime/datetime-simple-date-format-parser.h"
+#include "runtime/exec-env.h"
 #include "runtime/timestamp-parse-util.h"
 #include "runtime/timestamp-value.inline.h"
+#include "service/frontend.h"
 
 #include "common/names.h"
 
+using boost::algorithm::starts_with;
 using boost::date_time::not_a_date_time;
 using boost::gregorian::date;
 using boost::gregorian::date_duration;
@@ -155,6 +160,32 @@ void TimestampValue::UtcToLocal(const Timezone& local_tz,
   }
 }
 
+void TimestampValue::HiveLegacyUtcToLocal(const Timezone& local_tz) {
+  DCHECK(HasDateAndTime());
+  int64_t utc_time_millis;
+  if (UNLIKELY(!FloorUtcToUnixTimeMillis(&utc_time_millis))) {
+    SetToInvalidDateTime();
+    return;
+  }
+
+  string tz = local_tz.name();
+  static constexpr std::string_view zoneinfo = "/usr/share/zoneinfo/";
+  if (starts_with(tz, zoneinfo)) {
+    tz = tz.substr(zoneinfo.size());
+  }
+
+  TCivilTime cs;
+  Status status = 
ExecEnv::GetInstance()->frontend()->HiveLegacyTimezoneConvert(
+      tz, utc_time_millis, &cs);
+  if (UNLIKELY(!status.ok())) {
+    // This would result in log spam. However it should be impossible to fail.
+    LOG(ERROR) << "Timezone " << tz << " cannot be used with legacy Hive 
conversion.";
+    return;
+  }
+  date_ = boost::gregorian::date(cs.year, cs.month, cs.day);
+  time_ = time_duration(cs.hour, cs.minute, cs.second, 
time_.fractional_seconds());
+}
+
 void TimestampValue::LocalToUtc(const Timezone& local_tz,
     TimestampValue* pre_utc_if_repeated, TimestampValue* post_utc_if_repeated) 
{
   DCHECK(HasDateAndTime());
@@ -227,14 +258,6 @@ TimestampValue TimestampValue::UnixTimeToLocal(
   }
 }
 
-TimestampValue TimestampValue::FromUnixTime(time_t unix_time, const Timezone* 
local_tz) {
-  if (local_tz != UTCPTR) {
-    return UnixTimeToLocal(unix_time, *local_tz);
-  } else {
-    return UtcFromUnixTimeTicks<1>(unix_time);
-  }
-}
-
 void TimestampValue::ToString(string& dst) const {
   dst.resize(SimpleDateFormatTokenizer::DEFAULT_DATE_TIME_FMT_LEN);
   const int out_len = TimestampParser::FormatDefault(date(), time(), 
dst.data());
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index f1fb126a8..02cfe7a62 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -296,6 +296,11 @@ class TimestampValue {
       TimestampValue* start_of_repeated_period = nullptr,
       TimestampValue* end_of_repeated_period = nullptr);
 
+  /// Converts from UTC to 'local_tz' time zone in-place. The caller must 
ensure the
+  /// TimestampValue this function is called upon has both a valid date and 
time. Uses
+  /// Java Calendar for conversion to match Hive's legacy conversion process.
+  void HiveLegacyUtcToLocal(const Timezone& local_tz);
+
   /// Converts from 'local_tz' to UTC time zone in-place. The caller must 
ensure the
   /// TimestampValue this function is called upon has both a valid date and 
time.
   ///
diff --git a/be/src/runtime/timestamp-value.inline.h 
b/be/src/runtime/timestamp-value.inline.h
index d49dbb2ae..6cfc63b6d 100644
--- a/be/src/runtime/timestamp-value.inline.h
+++ b/be/src/runtime/timestamp-value.inline.h
@@ -77,6 +77,15 @@ inline TimestampValue 
TimestampValue::UtcFromUnixTimeLimitedRangeNanos(
   return UtcFromUnixTimeTicks<NANOS_PER_SEC>(unix_time_nanos);
 }
 
+inline TimestampValue TimestampValue::FromUnixTime(time_t unix_time,
+    const Timezone* local_tz) {
+  if (local_tz != UTCPTR) {
+    return UnixTimeToLocal(unix_time, *local_tz);
+  } else {
+    return UtcFromUnixTimeTicks<1>(unix_time);
+  }
+}
+
 inline TimestampValue TimestampValue::FromUnixTimeNanos(time_t unix_time, 
int64_t nanos,
     const Timezone* local_tz) {
   unix_time =
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index aff23809f..22f3cdb77 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -157,7 +157,8 @@ Frontend::Frontend() {
   };
 
   JniMethodDescriptor staticMethods[] = {
-    {"getSecretFromKeyStore", "([B)Ljava/lang/String;", 
&get_secret_from_key_store_}
+    {"getSecretFromKeyStore", "([B)Ljava/lang/String;", 
&get_secret_from_key_store_},
+    {"hiveLegacyTimezoneConvert", "([BJ)[B", &hive_legacy_timezone_convert_}
   };
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -436,3 +437,12 @@ Status Frontend::GetSecretFromKeyStore(const string& 
secret_key, string* secret)
   return JniUtil::CallStaticJniMethod(fe_class_, get_secret_from_key_store_, 
secret_key_t,
       secret);
 }
+
+Status Frontend::HiveLegacyTimezoneConvert(
+    const string& timezone, long utc_time_millis, TCivilTime* local_time) {
+  TStringLiteral timezone_t;
+  timezone_t.__set_value(timezone);
+  return JniCall::static_method(fe_class_, hive_legacy_timezone_convert_)
+      .with_thrift_arg(timezone_t).with_primitive_arg(utc_time_millis)
+      .Call(local_time);
+}
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index 823f98ca8..e2f0b551f 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -249,6 +249,10 @@ class Frontend {
   /// Get secret from jceks key store for the input secret_key.
   Status GetSecretFromKeyStore(const string& secret_key, string* secret);
 
+  /// Convert UTC UNIX time (in millis) to target timezone using Hive legacy 
conversion.
+  Status HiveLegacyTimezoneConvert(
+      const string& timezone, long utc_time_millis, TCivilTime* local_time);
+
  private:
   jclass fe_class_; // org.apache.impala.service.JniFrontend class
   jobject fe_;  // instance of org.apache.impala.service.JniFrontend
@@ -292,6 +296,7 @@ class Frontend {
   jmethodID commit_kudu_txn_; // JniFrontend.commitKuduTransaction()
   jmethodID convertTable; // JniFrontend.convertTable
   jmethodID get_secret_from_key_store_; // JniFrontend.getSecretFromKeyStore()
+  jmethodID hive_legacy_timezone_convert_; // 
JniFrontend.hiveLegacyTimezoneConvert()
 
   // Only used for testing.
   jmethodID build_test_descriptor_table_id_; // 
JniFrontend.buildTestDescriptorTable()
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 44d571a80..5da1c6eb8 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1330,6 +1330,10 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type 
option, const string& va
         query_options->__set_estimate_duplicate_in_preagg(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::USE_LEGACY_HIVE_TIMESTAMP_CONVERSION: {
+        
query_options->__set_use_legacy_hive_timestamp_conversion(IsTrue(value));
+        break;
+      }
       default:
         string key = to_string(option);
         if (IsRemovedQueryOption(key)) {
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index b5587ec2a..6ce06aea7 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -51,7 +51,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // plus one. Thus, the second argument to the DCHECK has to be updated every
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 constexpr unsigned NUM_QUERY_OPTIONS =
-    TImpalaQueryOptions::ESTIMATE_DUPLICATE_IN_PREAGG + 1;
+    TImpalaQueryOptions::USE_LEGACY_HIVE_TIMESTAMP_CONVERSION + 1;
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS);   
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
@@ -364,6 +364,8 @@ constexpr unsigned NUM_QUERY_OPTIONS =
       ENABLE_TUPLE_ANALYSIS_IN_AGGREGATE, TQueryOptionLevel::ADVANCED)         
          \
   QUERY_OPT_FN(estimate_duplicate_in_preagg,                                   
          \
       ESTIMATE_DUPLICATE_IN_PREAGG, TQueryOptionLevel::ADVANCED)               
          \
+  QUERY_OPT_FN(use_legacy_hive_timestamp_conversion,                           
          \
+      USE_LEGACY_HIVE_TIMESTAMP_CONVERSION, TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index bd70374e9..15007ae2a 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -1092,3 +1092,17 @@ struct TWrappedHttpResponse {
   5: optional string content
   6: optional string content_type
 }
+
+// Captures civil time - local time in a specific time zone - mirroring
+// cctz::civil_second. Used to serialize Java timezone conversions back to C++ 
code.
+// Omits subsecond measurements because
+// - matches cctz::civil_second; no known timezone libraries have subsecond 
adjustments
+// - Java timezone conversion is only accurate to milliseconds, but we use 
nanoseconds
+struct TCivilTime {
+  1: required i32 year
+  2: required i32 month
+  3: required i32 day
+  4: required i32 hour
+  5: required i32 minute
+  6: required i32 second
+}
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 90992a269..b28754848 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -977,6 +977,15 @@ enum TImpalaQueryOptions {
   // If True, account for probability of having duplicate grouping key exist 
in multiple
   // nodes during preaggreation.
   ESTIMATE_DUPLICATE_IN_PREAGG = 185
+
+  // When true and CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS is also enabled, 
TIMESTAMP
+  // conversion to local time will fallback to the timestamp conversion method 
from Hive
+  // 3.0 and earlier if not specified in the file. This matches the Hive option
+  // 'hive.parquet.timestamp.legacy.conversion.enabled', which defaults to 
true. Impala
+  // defaults to false because conversion is ~50x slower than Impala's default 
conversion
+  // method and they produce the same results for modern time periods (post 
1970, and in
+  // most instances before that).
+  USE_LEGACY_HIVE_TIMESTAMP_CONVERSION = 186
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 851f5a573..0e8458082 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -759,6 +759,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   186: optional bool estimate_duplicate_in_preagg = true
+
+  // See comment in ImpalaService.thrift
+  187: optional bool use_legacy_hive_timestamp_conversion = false;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
diff --git a/docs/topics/impala_timestamp.xml b/docs/topics/impala_timestamp.xml
index efa367e83..33e7d40ba 100644
--- a/docs/topics/impala_timestamp.xml
+++ b/docs/topics/impala_timestamp.xml
@@ -263,6 +263,43 @@ DATE_ADD (<varname>timestamp</varname>, INTERVAL 
<varname>interval</varname> <va
           Parquet files written by Hive.
         </p>
 
+        <p>
+          Hive versions prior to 3.1 wrote Parquet files in local time using 
Java's
+          SimpleDateFormat. That method has some cases that differ from both 
Impala's
+          method and the default method used in Hive 3.1.2+ that are based on 
the
+          <xref href="https://www.iana.org/time-zones"; format="html" 
scope="external">
+          IANA Time Zone Database</xref>. Hive 4 added the
+          <codeph>writer.zone.conversion.legacy</codeph> Parquet file metadata 
property
+          to identify which method was used to write the file (controlled by
+          
<codeph>hive.parquet.timestamp.write.legacy.conversion.enabled</codeph>). When
+          the Parquet file was written by Parquet Java 
(<codeph>parquet-mr</codeph>), Hive -
+          and Impala's behavior when
+          <codeph>convert_legacy_hive_parquet_utc_timestamps</codeph> is
+          <codeph>true</codeph> - are:
+          <ul>
+            <li>
+              If <codeph>writer.zone.conversion.legacy</codeph> is present, 
use the legacy
+              conversion method if true, use the newer method if false.
+            </li>
+            <li>
+              If <codeph>writer.zone.conversion.legacy</codeph> is not present 
but
+              <codeph>writer.time.zone</codeph> is, we can infer the file was 
written by
+              Hive 3.1.2+ using new APIs and use the newer method.
+            </li>
+            <li>
+              Otherwise assume it was written by an earlier Hive release. In 
that case
+              Hive will select conversion method based on
+              
<codeph>hive.parquet.timestamp.legacy.conversion.enabled</codeph> (defaults
+              to <codeph>true</codeph>). <keyword keyref="impala45"/> adds the 
query
+              option <codeph>use_legacy_hive_timestamp_conversion</codeph> to 
select this
+              behavior. It defaults to <codeph>false</codeph> because 
conversion is ~50x
+              slower than Impala's default conversion method and they produce 
the same
+              results for modern time periods (post 1970, and in most 
instances before
+              that).
+            </li>
+          </ul>
+        </p>
+
         <p>
           Hive currently cannot write <codeph>INT64</codeph> 
<codeph>TIMESTAMP</codeph> values.
         </p>
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java 
b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 3a3e4d77f..c737d2f17 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -54,6 +54,7 @@ import org.apache.impala.service.Frontend.PlanCtx;
 import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TBuildTestDescriptorTableParams;
 import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCivilTime;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDescribeDbParams;
 import org.apache.impala.thrift.TDescribeResult;
@@ -115,11 +116,14 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.lang.IllegalArgumentException;
+import java.util.Calendar;
 import java.util.Collections;
+import java.util.GregorianCalendar;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 
 /**
  * JNI-callable interface onto a wrapped Frontend instance. The main point is 
to serialise
@@ -808,6 +812,29 @@ public class JniFrontend {
     return secret;
   }
 
+  /**
+   * Performs Hive legacy-compatible timezone conversion
+   */
+  public static byte[] hiveLegacyTimezoneConvert(byte[] timezoneT, long 
utc_time_millis)
+      throws ImpalaException {
+    final TStringLiteral timezone = new TStringLiteral();
+    JniUtil.deserializeThrift(protocolFactory_, timezone, timezoneT);
+    // TimeZone.getTimeZone defaults to GMT if it doesn't recognize the 
timezone.
+    Calendar c = new 
GregorianCalendar(TimeZone.getTimeZone(timezone.getValue()));
+    c.setTimeInMillis(utc_time_millis);
+
+    final TCivilTime civilTime = new TCivilTime(
+        // Normalize month so January starts at 1.
+        c.get(Calendar.YEAR), c.get(Calendar.MONTH)+1, 
c.get(Calendar.DAY_OF_MONTH),
+        c.get(Calendar.HOUR_OF_DAY), c.get(Calendar.MINUTE), 
c.get(Calendar.SECOND));
+    try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
+      return serializer.serialize(civilTime);
+    } catch (TException e) {
+      throw new InternalException(e.getMessage());
+    }
+  }
+
   public String validateSaml2Bearer(byte[] serializedRequest) throws 
ImpalaException{
     Preconditions.checkNotNull(frontend_);
     Preconditions.checkNotNull(frontend_.getSaml2Client());
diff --git a/testdata/data/employee_hive_3_1_3_us_pacific.parquet 
b/testdata/data/employee_hive_3_1_3_us_pacific.parquet
new file mode 100644
index 000000000..2125bc688
Binary files /dev/null and 
b/testdata/data/employee_hive_3_1_3_us_pacific.parquet differ
diff --git a/testdata/data/hive_kuala_lumpur_legacy.parquet 
b/testdata/data/hive_kuala_lumpur_legacy.parquet
new file mode 100644
index 000000000..f03efd4ff
Binary files /dev/null and b/testdata/data/hive_kuala_lumpur_legacy.parquet 
differ
diff --git a/testdata/data/tbl_parq1/000000_0 b/testdata/data/tbl_parq1/000000_0
new file mode 100644
index 000000000..17a910f52
Binary files /dev/null and b/testdata/data/tbl_parq1/000000_0 differ
diff --git a/testdata/data/tbl_parq1/000000_1 b/testdata/data/tbl_parq1/000000_1
new file mode 100644
index 000000000..cc0c27e6a
Binary files /dev/null and b/testdata/data/tbl_parq1/000000_1 differ
diff --git a/testdata/data/tbl_parq1/000000_2 b/testdata/data/tbl_parq1/000000_2
new file mode 100644
index 000000000..ba04470e2
Binary files /dev/null and b/testdata/data/tbl_parq1/000000_2 differ
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-313.test
 
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-313.test
new file mode 100644
index 000000000..b92aa96fc
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-313.test
@@ -0,0 +1,46 @@
+====
+---- QUERY
+SET timezone=PST;
+SELECT * FROM employee_hive_3_1_3_us_pacific;
+---- TYPES
+INT,TIMESTAMP
+---- RESULTS
+1,1880-01-01 07:52:58
+2,1884-01-01 08:00:00
+3,1990-01-01 08:00:00
+====
+---- QUERY
+SET timezone=PST;
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SELECT * FROM employee_hive_3_1_3_us_pacific;
+---- TYPES
+INT,TIMESTAMP
+---- RESULTS
+1,1880-01-01 00:00:00
+2,1884-01-01 00:00:00
+3,1990-01-01 00:00:00
+====
+---- QUERY
+SET timezone=PST;
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SET use_legacy_hive_timestamp_conversion=true;
+SELECT * FROM employee_hive_3_1_3_us_pacific;
+---- TYPES
+INT,TIMESTAMP
+---- RESULTS
+1,1880-01-01 00:00:00
+2,1884-01-01 00:00:00
+3,1990-01-01 00:00:00
+====
+---- QUERY
+SET timezone=UTC;
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SET use_legacy_hive_timestamp_conversion=true;
+SELECT * FROM employee_hive_3_1_3_us_pacific;
+---- TYPES
+INT,TIMESTAMP
+---- RESULTS
+1,1880-01-01 07:52:58
+2,1884-01-01 08:00:00
+3,1990-01-01 08:00:00
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-3m.test
 
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-3m.test
new file mode 100644
index 000000000..84e61bff3
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-3m.test
@@ -0,0 +1,34 @@
+====
+---- QUERY
+SET timezone="Asia/Singapore";
+SELECT * FROM t;
+---- RESULTS
+1899-12-31 16:00:00
+1899-12-31 16:00:00
+1899-12-31 17:04:35
+2020-04-08 05:17:05.215000000
+2020-04-08 05:17:05.215000000
+====
+---- QUERY
+SET timezone="Asia/Singapore";
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SELECT * FROM t;
+---- RESULTS
+1899-12-31 22:55:25
+1899-12-31 22:55:25
+1900-01-01 00:00:00
+2020-04-08 13:17:05.215000000
+2020-04-08 13:17:05.215000000
+====
+---- QUERY
+SET timezone="Asia/Singapore";
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SET use_legacy_hive_timestamp_conversion=true;
+SELECT * FROM t;
+---- RESULTS
+1899-12-31 22:55:25
+1900-01-01 00:00:00
+1900-01-01 00:00:00
+2020-04-08 13:17:05.215000000
+2020-04-08 13:17:05.215000000
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-4.test
 
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-4.test
new file mode 100644
index 000000000..f97a805c3
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/timestamp-conversion-hive-4.test
@@ -0,0 +1,28 @@
+====
+---- QUERY
+SET timezone="Asia/Kuala_Lumpur";
+SELECT * FROM hive_kuala_lumpur_legacy;
+---- RESULTS
+1899-12-31 16:00:00
+1909-12-31 17:00:00
+1934-12-31 16:40:00
+1939-12-31 16:40:00
+1941-12-31 16:30:00
+1943-12-31 15:00:00
+1969-01-28 16:30:00
+1999-12-31 16:00:00
+====
+---- QUERY
+SET timezone="Asia/Kuala_Lumpur";
+SET convert_legacy_hive_parquet_utc_timestamps=true;
+SELECT * FROM hive_kuala_lumpur_legacy;
+---- RESULTS
+1900-01-01 00:00:00
+1910-01-01 00:00:00
+1935-01-01 00:00:00
+1940-01-01 00:00:00
+1942-01-01 00:00:00
+1944-01-01 00:00:00
+1969-01-29 00:00:00
+2000-01-01 00:00:00
+====
diff --git a/tests/query_test/test_hive_timestamp_conversion.py 
b/tests/query_test/test_hive_timestamp_conversion.py
new file mode 100644
index 000000000..2d594f144
--- /dev/null
+++ b/tests/query_test/test_hive_timestamp_conversion.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.file_utils import create_table_and_copy_files, 
create_table_from_parquet
+
+
+class TestHiveParquetTimestampConversion(ImpalaTestSuite):
+  """Tests that Impala can read parquet files written by older versions of 
Hive or with
+  Hive legacy conversion enabled. Tests use 
convert_legacy_hive_parquet_utc_timestamps,
+  use_legacy_hive_timestamp_conversion, and timezone to test conversion."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestHiveParquetTimestampConversion, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet'
+        and v.get_value('table_format').compression_codec == 'none')
+
+  def test_hive_4_legacy(self, vector, unique_database):
+    """Test that legacy conversion uses the same timezone conversion as Hive 
when
+    Parquet metadata contains writer.zone.conversion.legacy=true.
+
+    Load test data generated via Hive with TZ=Asia/Kuala_Lumpur:
+
+        create table t (d timestamp) stored as parquet;
+        set hive.parquet.timestamp.write.legacy.conversion.enabled=true;
+        insert into t values ("1900-01-01 00:00:00"), ("1910-01-01 00:00:00"),
+            ("1935-01-01 00:00:00"), ("1940-01-01 00:00:00"), ("1942-01-01 
00:00:00"),
+            ("1944-01-01 00:00:00"), ("1969-01-29 00:00:00"), ("2000-01-01 
00:00:00");
+    """
+    create_table_from_parquet(self.client, unique_database, 
"hive_kuala_lumpur_legacy")
+    self.run_test_case("QueryTest/timestamp-conversion-hive-4", vector, 
unique_database)
+
+  def test_hive_313(self, vector, unique_database):
+    """The parquet file was written with Hive 3.1.3 using the new Date/Time 
APIs
+    (legacy=false) to convert from US/Pacific to UTC. The presence of 
writer.time.zone in
+    the metadata of the file allow us to infer that new Date/Time APIS should 
be used for
+    the conversion. The use_legacy_hive_timestamp_conversion property 
shouldn't be taken
+    into account in this case.
+
+    Test file from 
https://github.com/apache/hive/blob/rel/release-4.0.1/data/files/
+        employee_hive_3_1_3_us_pacific.parquet"""
+    create_table_from_parquet(
+        self.client, unique_database, "employee_hive_3_1_3_us_pacific")
+    self.run_test_case("QueryTest/timestamp-conversion-hive-313", vector, 
unique_database)
+
+  def test_hive_3_mixed(self, vector, unique_database):
+    """Test table containing Hive legacy timestamps written with Hive prior to 
3.1.3.
+
+    Test files target timezone=Asia/Singapore, sourced from
+    
https://github.com/apache/hive/tree/rel/release-4.0.1/data/files/tbl_parq1.""";
+    create_stmt = "create table %s.t (d timestamp) stored as parquet" % 
unique_database
+    create_table_and_copy_files(self.client, create_stmt, unique_database, "t",
+        ["testdata/data/tbl_parq1/" + f for f in ["000000_0", "000000_1", 
"000000_2"]])
+    self.run_test_case("QueryTest/timestamp-conversion-hive-3m", vector, 
unique_database)

Reply via email to