This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 4837cedc7 IMPALA-10319: Support arbitrary encodings on Text files
4837cedc7 is described below

commit 4837cedc795017aa1e7b69ef0914020a3022ca88
Author: Mihaly Szjatinya <[email protected]>
AuthorDate: Sun Jun 1 15:36:48 2025 +0200

    IMPALA-10319: Support arbitrary encodings on Text files
    
    As proposed in Jira, this implements decoding and encoding of text
    buffers for Impala/Hive text tables. Given a table with
    'serialization.encoding' property set, similarly to Hive, Impala should
    be able to encode the inserted data into charset specified, consequently
    saving it into a text file. The opposite decoding operation should be
    performed upon reading data buffers from text files. Both operations
    employ boost::locale::conv library.
    
    Since Hive doesn't encode line delimiters, charsets that would have
    delimiters stored differently from ASCII are not allowed.
    
    One difference from Hive is that Impala implements
    'serialization.encoding' only as a per partition serdeproperty to avoid
    confusion of allowing both serde and tbl properties. (See related
    IMPALA-13748)
    
    Note: Due to precreated non-UTF-8 files present in the patch
    'gerrit-code-review-checks' was performed locally. (See IMPALA-14100)
    
    Change-Id: I787cd01caa52a19d6645519a6cedabe0a5253a65
    Reviewed-on: http://gerrit.cloudera.org:8080/22049
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hdfs-scanner.h                         |   5 +-
 be/src/exec/hdfs-text-table-writer.cc              |  17 +-
 be/src/exec/hdfs-text-table-writer.h               |   6 +-
 be/src/exec/scanner-context.h                      |   4 +-
 be/src/exec/text/hdfs-text-scanner.cc              |  17 +-
 be/src/exec/text/hdfs-text-scanner.h               |  15 +-
 be/src/runtime/descriptors.cc                      |   1 +
 be/src/runtime/descriptors.h                       |   2 +
 be/src/runtime/mem-tracker.cc                      |  19 +
 be/src/runtime/mem-tracker.h                       |  13 +
 be/src/runtime/row-batch.h                         |   2 +-
 be/src/util/CMakeLists.txt                         |   1 +
 be/src/util/char-codec.cc                          | 194 +++++++++
 be/src/util/char-codec.h                           |  84 ++++
 bin/rat_exclude_files.txt                          |   1 +
 common/thrift/CatalogObjects.thrift                |   1 +
 common/thrift/generate_error_codes.py              |   4 +-
 .../analysis/AlterTableSetTblProperties.java       |  95 +++++
 .../apache/impala/analysis/CreateTableStmt.java    |  44 ++
 .../impala/catalog/HdfsStorageDescriptor.java      |  29 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  58 +++
 testdata/charcodec/cp1251_names.txt                |   3 +
 testdata/charcodec/cp1251_names_utf8.txt           |   3 +
 testdata/charcodec/gbk_names.txt                   |   3 +
 testdata/charcodec/gbk_names_error.txt             | Bin 0 -> 40 bytes
 testdata/charcodec/gbk_names_utf8.txt              |   3 +
 testdata/charcodec/koi8r_names.txt                 |   3 +
 testdata/charcodec/koi8r_names_utf8.txt            |   3 +
 testdata/charcodec/latin1_names.txt                |   3 +
 testdata/charcodec/latin1_names_utf8.txt           |   3 +
 testdata/charcodec/shift_jis_names.txt             |   3 +
 testdata/charcodec/shift_jis_names_error.txt       | Bin 0 -> 30 bytes
 testdata/charcodec/shift_jis_names_utf8.txt        |   3 +
 tests/query_test/test_charcodec.py                 | 441 +++++++++++++++++++++
 34 files changed, 1063 insertions(+), 20 deletions(-)

diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 77d4018cf..c71e94077 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -313,8 +313,9 @@ class HdfsScanner {
   /// The most recently used decompression type.
   THdfsCompression::type decompression_type_ = THdfsCompression::NONE;
 
-  /// Pool to allocate per data block memory.  This should be used with the
-  /// decompressor and any other per data block allocations.
+  /// Pool to allocate per data block memory. This should be used with the 
decompressor
+  /// and any other per data block allocations. In case of decoding it 
contains resulting
+  /// decoded data.
   boost::scoped_ptr<MemPool> data_buffer_pool_;
 
   /// Offsets of string slots in the result tuple that may need to be copied 
as part of
diff --git a/be/src/exec/hdfs-text-table-writer.cc 
b/be/src/exec/hdfs-text-table-writer.cc
index 9ac4a7781..4450dcaf8 100644
--- a/be/src/exec/hdfs-text-table-writer.cc
+++ b/be/src/exec/hdfs-text-table-writer.cc
@@ -28,6 +28,7 @@
 #include "util/coding-util.h"
 #include "util/hdfs-util.h"
 #include "util/runtime-profile-counters.h"
+#include "util/char-codec.h"
 
 #include <hdfs.h>
 #include <stdlib.h>
@@ -52,8 +53,16 @@ HdfsTextTableWriter::HdfsTextTableWriter(TableSinkBase* 
parent,
   rowbatch_stringstream_.precision(RawValue::ASCII_PRECISION);
 }
 
+HdfsTextTableWriter::~HdfsTextTableWriter() {}
+
 Status HdfsTextTableWriter::Init() {
   parent_->mem_tracker()->Consume(flush_size_);
+
+  const auto& encoding = output_->partition_descriptor->encoding_value();
+  if (!encoding.empty() && encoding != "UTF-8") {
+    encoder_.reset(new CharCodec(nullptr, encoding));
+  }
+
   return Status::OK();
 }
 
@@ -147,7 +156,13 @@ Status HdfsTextTableWriter::InitNewFile() {
 }
 
 Status HdfsTextTableWriter::Flush() {
-  string rowbatch_string = rowbatch_stringstream_.str();
+  string rowbatch_string;
+  if (encoder_) {
+    RETURN_IF_ERROR(encoder_->EncodeBuffer(
+        rowbatch_stringstream_.str(), &rowbatch_string));
+  } else {
+    rowbatch_string = rowbatch_stringstream_.str();
+  }
   rowbatch_stringstream_.str(string());
   const uint8_t* data =
       reinterpret_cast<const uint8_t*>(rowbatch_string.data());
diff --git a/be/src/exec/hdfs-text-table-writer.h 
b/be/src/exec/hdfs-text-table-writer.h
index 29be605c9..fe43ea5be 100644
--- a/be/src/exec/hdfs-text-table-writer.h
+++ b/be/src/exec/hdfs-text-table-writer.h
@@ -37,6 +37,7 @@ class RuntimeState;
 class StringValue;
 class TupleDescriptor;
 class TupleRow;
+class CharCodec;
 
 /// The writer consumes all rows passed to it and writes the evaluated 
output_exprs_
 /// as delimited text into Hdfs files.
@@ -47,7 +48,7 @@ class HdfsTextTableWriter : public HdfsTableWriter {
       const HdfsPartitionDescriptor* partition,
       const HdfsTableDescriptor* table_desc);
 
-  ~HdfsTextTableWriter() { }
+  ~HdfsTextTableWriter();
 
   virtual Status Init();
   virtual Status Finalize();
@@ -87,6 +88,9 @@ class HdfsTextTableWriter : public HdfsTableWriter {
   /// Stringstream to buffer output.  The stream is cleared between HDFS
   /// Write calls to allow for the internal buffers to be reused.
   std::stringstream rowbatch_stringstream_;
+
+  /// For non-utf8 text files
+  std::unique_ptr<CharCodec> encoder_;
 };
 
 }
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index d69508258..d9a5dddb8 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -45,7 +45,7 @@ class TupleRow;
 /// Columnar formats have multiple streams per context object.
 /// This class handles stitching data split across IO buffers and providing
 /// some basic parsing utilities.
-/// This class it *not* thread safe. It is designed to have a single scanner 
thread
+/// This class is *not* thread safe. It is designed to have a single scanner 
thread
 /// reading from it.
 //
 /// Each scanner context maps to a single hdfs split.  There are three threads 
that
@@ -192,7 +192,7 @@ class ScannerContext {
     bool SkipBytes(int64_t length, Status* status) WARN_UNUSED_RESULT;
 
     /// Read length bytes into the supplied buffer.  The returned buffer is 
owned
-    /// by this object The memory is owned by and should not be modified. The 
contents
+    /// by this object. The memory is owned by and should not be modified. The 
contents
     /// of the buffer are invalidated after subsequent calls to 
GetBytes()/ReadBytes().
     /// Returns true on success, otherwise returns false and sets 'status' to
     /// indicate the error.
diff --git a/be/src/exec/text/hdfs-text-scanner.cc 
b/be/src/exec/text/hdfs-text-scanner.cc
index 2add0c857..b4a95e7f8 100644
--- a/be/src/exec/text/hdfs-text-scanner.cc
+++ b/be/src/exec/text/hdfs-text-scanner.cc
@@ -55,6 +55,7 @@
 #include "util/error-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/stopwatch.h"
+#include "util/char-codec.h"
 
 #include "common/names.h"
 
@@ -249,6 +250,12 @@ Status HdfsTextScanner::InitNewRange() {
       scan_node_->hdfs_table()->null_column_value(), true,
       state_->strict_mode()));
 
+  const auto& encoding = hdfs_partition->encoding_value();
+  if (!encoding.empty() && encoding != "UTF-8") {
+    decoder_.reset(new CharCodec(data_buffer_pool_.get(), encoding,
+        hdfs_partition->line_delim(), 
scan_node_->tuple_desc()->string_slots().empty()));
+  }
+
   RETURN_IF_ERROR(ResetScanner());
   scan_state_ = SCAN_RANGE_INITIALIZED;
   return Status::OK();
@@ -535,6 +542,12 @@ Status HdfsTextScanner::FillByteBuffer(MemPool* pool, 
bool* eosr, int num_bytes)
     *eosr = byte_buffer_read_size_ == 0 ? true : stream_->eosr();
   }
 
+  if (decoder_.get() != nullptr) {
+    SCOPED_TIMER(decode_timer_);
+    
RETURN_IF_ERROR(decoder_->DecodeBuffer(reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
+        &byte_buffer_read_size_, pool, *eosr, decompressor_.get() != nullptr, 
context_));
+  }
+
   byte_buffer_end_ = byte_buffer_ptr_ + byte_buffer_read_size_;
   return Status::OK();
 }
@@ -658,6 +671,7 @@ Status HdfsTextScanner::Open(ScannerContext* context) {
   RETURN_IF_ERROR(HdfsScanner::Open(context));
 
   parse_delimiter_timer_ = ADD_TIMER(scan_node_->runtime_profile(), 
"DelimiterParseTime");
+  decode_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecodingTime");
 
   // Allocate the scratch space for two pass parsing.  The most fields we can 
go
   // through in one parse pass is the batch size (tuples) * the number of 
fields per tuple
@@ -731,7 +745,8 @@ int HdfsTextScanner::WriteFields(int num_fields, int 
num_tuples, MemPool* pool,
   if (num_tuples > 0) {
     // Need to copy out strings if they may reference the original I/O buffer.
     const bool copy_strings = !string_slot_offsets_.empty() &&
-        stream_->file_desc()->file_compression == THdfsCompression::NONE;
+        stream_->file_desc()->file_compression == THdfsCompression::NONE &&
+        decoder_.get() == nullptr;
     int max_added_tuples = (scan_node_->limit() == -1) ?
         num_tuples :
         scan_node_->limit() - scan_node_->rows_returned_shared();
diff --git a/be/src/exec/text/hdfs-text-scanner.h 
b/be/src/exec/text/hdfs-text-scanner.h
index a6309c8c3..c4c08e7cb 100644
--- a/be/src/exec/text/hdfs-text-scanner.h
+++ b/be/src/exec/text/hdfs-text-scanner.h
@@ -29,6 +29,7 @@ template<bool>
 class DelimitedTextParser;
 class ScannerContext;
 struct HdfsFileDesc;
+class CharCodec;
 
 /// HdfsScanner implementation that understands text-formatted records.
 /// Uses SSE instructions, if available, for performance.
@@ -175,7 +176,8 @@ class HdfsTextScanner : public HdfsScanner {
   /// decompression functions 
DecompressFileToBuffer()/DecompressStreamToBuffer().
   /// If applicable, attaches decompression buffers from previous calls that 
might still
   /// be referenced by returned batches to 'pool'. If 'pool' is nullptr the 
buffers are
-  /// freed instead.
+  /// freed instead. In case of decoding calls decoder_->DecodeBuffer() which 
overwrites
+  /// the byte_buffer_ptr_ with decoded data on data_buffer_pool_.
   ///
   /// Subclasses can override this function to implement different behaviour.
   /// TODO: IMPALA-6146: rethink this interface - having subclasses modify 
member
@@ -223,7 +225,7 @@ class HdfsTextScanner : public HdfsScanner {
   /// Mem pool for boundary_row_, boundary_column_, partial_tuple_ and any 
variable length
   /// data that is pointed at by the partial tuple.  Does not hold any tuple 
data
   /// of returned batches, because the data is always deep-copied into the 
output batch.
-  boost::scoped_ptr<MemPool> boundary_pool_;
+  std::unique_ptr<MemPool> boundary_pool_;
 
   /// Helper string for dealing with input rows that span file blocks.  We 
keep track of
   /// a whole line that spans file blocks to be able to report the line as 
erroneous in
@@ -239,7 +241,7 @@ class HdfsTextScanner : public HdfsScanner {
   int slot_idx_;
 
   /// Helper class for picking fields and rows from delimited text.
-  boost::scoped_ptr<DelimitedTextParser<true>> delimited_text_parser_;
+  std::unique_ptr<DelimitedTextParser<true>> delimited_text_parser_;
 
   /// Return field locations from the Delimited Text Parser.
   std::vector<FieldLocation> field_locations_;
@@ -265,6 +267,13 @@ class HdfsTextScanner : public HdfsScanner {
 
   /// Time parsing text files
   RuntimeProfile::Counter* parse_delimiter_timer_;
+
+  /// For non-utf8 text files
+  std::unique_ptr<CharCodec> decoder_;
+
+  /// Time spent decoding bytes
+  RuntimeProfile::Counter* decode_timer_;
+
 };
 
 }
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index eda1bfacc..d4bc2f62d 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -220,6 +220,7 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor(
   block_size_ = sd.blockSize;
   file_format_ = sd.fileFormat;
   json_binary_format_ = sd.jsonBinaryFormat;
+  encoding_value_ = sd.__isset.encodingValue ? sd.encodingValue : "";
   DecompressLocation(thrift_table, thrift_partition, &location_);
 }
 
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 2e432d8bc..8b0457286 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -399,6 +399,7 @@ class HdfsPartitionDescriptor {
   char escape_char() const { return escape_char_; }
   THdfsFileFormat::type file_format() const { return file_format_; }
   int block_size() const { return block_size_; }
+  const std::string& encoding_value() const { return encoding_value_; }
   const std::string& location() const { return location_; }
   int64_t id() const { return id_; }
   TJsonBinaryFormat::type json_binary_format() const { return 
json_binary_format_; }
@@ -419,6 +420,7 @@ class HdfsPartitionDescriptor {
   char collection_delim_;
   char escape_char_;
   int block_size_;
+  std::string encoding_value_;
   // TODO: use the same representation as the Catalog does, in which common 
prefixes are
   // stripped.
   std::string location_;
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index fe9c7e76f..8818861cb 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -571,4 +571,23 @@ bool MemTracker::GcMemory(int64_t max_consumption) {
   }
   return curr_consumption > max_consumption;
 }
+
+Status ScopedMemTracker::TryConsume(int64_t size) {
+  DCHECK(tracker_ != nullptr);
+  if (tracker_->TryConsume(size)) {
+    size_ += size;
+  } else {
+    Status status = tracker_->MemLimitExceeded(
+        nullptr, "Failed to allocate memory required by DecodeBuffer", size);
+    tracker_ = nullptr;
+    return status;
+  }
+  return Status::OK();
+}
+
+ScopedMemTracker::~ScopedMemTracker() {
+  if (tracker_ != nullptr && size_ > 0) {
+    tracker_->Release(size_);
+  }
+}
 }
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 1d20428cd..d8915b2d6 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -619,4 +619,17 @@ class MemTrackerAllocator : public Alloc {
 typedef MemTrackerAllocator<char> CharMemTrackerAllocator;
 typedef std::basic_string<char, std::char_traits<char>, 
CharMemTrackerAllocator>
     TrackedString;
+
+// Auxiliary class to track memory consumption for a scope of code.
+class ScopedMemTracker {
+ public:
+  ScopedMemTracker(MemTracker* tracker) : tracker_(tracker), size_(0) {}
+  ~ScopedMemTracker();
+
+  Status TryConsume(int64_t size);
+
+ private:
+  MemTracker* tracker_ = nullptr;
+  int64_t size_;
+};
 }
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 392d73f0e..3dd865924 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -510,7 +510,7 @@ class RowBatch {
   /// The memory ownership depends on whether legacy joins and aggs are 
enabled.
   ///
   /// Memory is malloc'd and owned by RowBatch and is freed upon its 
destruction. This is
-  /// more performant that allocating the pointers from 'tuple_data_pool_' 
especially
+  /// more performant than allocating the pointers from 'tuple_data_pool_' 
especially
   /// with SubplanNodes in the ExecNode tree because the tuple pointers are not
   /// transferred and do not have to be re-created in every Reset().
   const int tuple_ptrs_size_;
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 9e4314d87..4468f3ba7 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -44,6 +44,7 @@ set(UTIL_SRCS
   bit-packing.cc
   bit-util.cc
   bloom-filter.cc
+  char-codec.cc
   cgroup-util.cc
   coding-util.cc
   codec.cc
diff --git a/be/src/util/char-codec.cc b/be/src/util/char-codec.cc
new file mode 100644
index 000000000..0d7d0341a
--- /dev/null
+++ b/be/src/util/char-codec.cc
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/char-codec.h"
+
+#include <boost/locale.hpp>
+#include <string>
+
+#include "exec/scanner-context.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+
+using namespace impala;
+using namespace strings;
+
+CharCodec::CharCodec(MemPool* memory_pool, const std::string& encoding, char 
tuple_delim,
+    bool reuse_buffer)
+    : memory_pool_(memory_pool), encoding_(encoding), 
tuple_delim_(tuple_delim),
+      reuse_buffer_(reuse_buffer) {
+}
+
+const int CharCodec::MAX_SYMBOL = 4;
+
+Status CharCodec::DecodeBuffer(uint8_t** buffer, int64_t* bytes_read, MemPool* 
pool,
+    bool eosr, bool decompress, ScannerContext* context) {
+  std::string result_prefix;
+  std::string result_core;
+  std::string result_suffix;
+
+  // We're about to create a new decoding buffer (if we can't reuse). Attach 
the
+  // memory from previous decoding rounds to 'pool'. In case of streaming 
decompression
+  // this is already done in DecompressStreamToBuffer().
+  if (!decompress && !reuse_buffer_) {
+    if (pool != nullptr) {
+      pool->AcquireData(memory_pool_, false);
+    } else {
+      memory_pool_->FreeAll();
+    }
+    out_buffer_ = nullptr;
+  }
+
+  uint8_t* buf_start = *buffer;
+  uint8_t* buf_end = buf_start + *bytes_read;
+
+  // Allocate memory twice the size of the input buffer to handle the worst 
case
+  ScopedMemTracker scoped_mem_tracker(memory_pool_->mem_tracker());
+  RETURN_IF_ERROR(scoped_mem_tracker.TryConsume((*bytes_read) * 2));
+
+  RETURN_IF_ERROR(HandlePrefix(&buf_start, buf_end, &result_prefix));
+  RETURN_IF_ERROR(HandleCore(&buf_start, buf_end, &result_core));
+  RETURN_IF_ERROR(HandleSuffix(&buf_start, buf_end, &result_suffix));
+
+  if (eosr && !partial_symbol_.empty()) {
+    return Status(TErrorCode::CHARSET_CONVERSION_ERROR,
+        "End of stream reached with partial symbol.");
+  }
+
+  // In case of decompression, decompressed data can be freed up after decoding
+  if (decompress) {
+    memory_pool_->FreeAll();
+  } else if (eosr) {
+    context->ReleaseCompletedResources(false);
+  }
+
+  // Concat the results onto the output buffer
+  *bytes_read = result_prefix.size() + result_core.size() + 
result_suffix.size();
+  if (out_buffer_ == nullptr || buffer_length_ < *bytes_read) {
+    buffer_length_ =  *bytes_read;
+    out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
+    if (UNLIKELY(out_buffer_ == nullptr)) {
+      string details = Substitute(
+          "HdfsTextScanner::DecodeBuffer() failed to allocate $1 bytes.", 
*bytes_read);
+      return memory_pool_->mem_tracker()->MemLimitExceeded(nullptr, details, 
*bytes_read);
+    }
+  }
+  *buffer = out_buffer_;
+  memcpy(*buffer, result_prefix.data(), result_prefix.size());
+  memcpy(*buffer + result_prefix.size(), result_core.data(), 
result_core.size());
+  memcpy(*buffer + result_prefix.size() + result_core.size(),
+      result_suffix.data(), result_suffix.size());
+
+  return Status::OK();
+}
+
+Status CharCodec::HandlePrefix(uint8_t** buf_start, uint8_t* buf_end,
+    std::string* result_prefix) {
+  if (!partial_symbol_.empty()) {
+    std::vector<uint8_t> prefix;
+    prefix.reserve(MAX_SYMBOL);
+    prefix.assign(partial_symbol_.begin(), partial_symbol_.end());
+    bool success = false;
+    DCHECK_LT(partial_symbol_.size(), MAX_SYMBOL);
+    for (int i = 0; partial_symbol_.size() + i < MAX_SYMBOL && *buf_start + i 
< buf_end;
+          ++i) {
+      prefix.push_back((*buf_start)[i]);
+      try {
+        *result_prefix =
+            
boost::locale::conv::to_utf<char>(reinterpret_cast<char*>(prefix.data()),
+                reinterpret_cast<char*>(prefix.data()) + prefix.size(), 
encoding_,
+                boost::locale::conv::stop);
+        success = true;
+        *buf_start += i + 1;
+        break;
+      } catch (boost::locale::conv::conversion_error&) {
+        continue;
+      } catch (const std::exception& e) {
+        return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
+      }
+    }
+    if (!success) {
+      return Status(TErrorCode::CHARSET_CONVERSION_ERROR, "Unable to decode 
buffer");
+    }
+    partial_symbol_.clear();
+  }
+  return Status::OK();
+}
+
+Status CharCodec::HandleCore(uint8_t** buf_start, uint8_t* buf_end,
+    std::string* result_core) {
+  uint8_t* last_delim =
+      std::find_end(*buf_start, buf_end, &tuple_delim_, &tuple_delim_ + 1);
+  if (last_delim != buf_end) {
+    try {
+      *result_core = boost::locale::conv::to_utf<char>(
+          reinterpret_cast<char*>(*buf_start), 
reinterpret_cast<char*>(last_delim) + 1,
+          encoding_, boost::locale::conv::stop);
+    } catch (boost::locale::conv::conversion_error& e) {
+      return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
+    } catch (const std::exception& e) {
+      return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
+    }
+
+    *buf_start = last_delim + 1;
+  }
+  return Status::OK();
+}
+
+Status CharCodec::HandleSuffix(uint8_t** buf_start, uint8_t* buf_end,
+    std::string* result_suffix) {
+  if (*buf_start < buf_end) {
+    bool success = false;
+    uint8_t* end = buf_end;
+    while (buf_end - end < MAX_SYMBOL && end > *buf_start) {
+      try {
+        *result_suffix =
+            
boost::locale::conv::to_utf<char>(reinterpret_cast<char*>(*buf_start),
+                reinterpret_cast<char*>(end), encoding_, 
boost::locale::conv::stop);
+        success = true;
+        break;
+      } catch (boost::locale::conv::conversion_error&) {
+        --end;
+      } catch (const std::exception& e) {
+        return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
+      }
+    }
+
+    if (!success && end > *buf_start) {
+      return Status(TErrorCode::CHARSET_CONVERSION_ERROR, "Unable to decode 
buffer");
+    }
+    if (end < buf_end) {
+      partial_symbol_.assign(end, buf_end);
+    }
+  }
+
+  return Status::OK();
+}
+
+Status CharCodec::EncodeBuffer(const std::string& str, std::string* result) {
+  try {
+    *result = boost::locale::conv::from_utf<char>(
+        str, encoding_, boost::locale::conv::stop);
+  } catch (boost::locale::conv::conversion_error& e) {
+    return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
+  } catch (const std::exception& e) {
+    return Status(TErrorCode::CHARSET_CONVERSION_ERROR, e.what());
+  }
+
+  return Status::OK();
+}
diff --git a/be/src/util/char-codec.h b/be/src/util/char-codec.h
new file mode 100644
index 000000000..1f1ec2c8b
--- /dev/null
+++ b/be/src/util/char-codec.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "common/status.h"
+
+namespace impala {
+
+class MemPool;
+class MemTracker;
+class ScannerContext;
+
+/// Class for encoding and decoding character buffers between different 
encodings and
+/// UTF-8. Empolys the Boost.Locale library for encoding and decoding.
+class CharCodec {
+ public:
+  static const int MAX_SYMBOL;
+
+  CharCodec(MemPool* memory_pool, const std::string& encoding, char 
tuple_delim = '\n',
+    bool reuse_buffer = false);
+
+  /// Decodes 'buffer' from 'encoding_' to UTF-8, handling partial symbols and 
delimiters.
+  ///
+  /// The function processes the buffer in three parts:
+  /// 1. Prefix: attempts to complete partial_symbol_, stored from previous 
DecodeBuffer
+  /// call, by adding first bytes from buffer one by one.
+  /// 2. Core: Converts the main part of the buffer up to the last delimiter 
found.
+  /// 3. Suffix: in case buffer is split in the middle of a symbol, 
progressively
+  /// determines the incomplete part and stores it into partial_symbol_.
+  Status DecodeBuffer(uint8_t** buffer, int64_t* bytes_read, MemPool* pool, 
bool eosr,
+      bool decompress, ScannerContext* context);
+
+  /// Encodes 'str' from UTF-8 into a given 'encoding_'. Since
+  /// HdfsTextTableWriter::Flush(), currently being the only client of this 
function,
+  /// always flushes the buffer at the end of the row, we don't need to handle 
partial
+  /// symbols here.
+  Status EncodeBuffer(const std::string& str, std::string* result);
+
+ private:
+  Status HandlePrefix(uint8_t** buf_start, uint8_t* buf_end, std::string* 
result_prefix);
+  Status HandleCore(uint8_t** buf_start, uint8_t* buf_end, std::string* 
result_core);
+  Status HandleSuffix(uint8_t** buf_start, uint8_t* buf_end, std::string* 
result_suffix);
+
+  /// Pool to allocate the buffer to hold transformed data.
+  MemPool* memory_pool_ = nullptr;
+
+  /// Name of the encoding of the input / output data.
+  std::string encoding_;
+
+  /// The following members are only used by DecodeBuffer:
+  /// Delimiter used to separate tuples.
+  const char tuple_delim_;
+
+  /// Buffer to hold the partial symbol that could not be decoded in the 
previous call to
+  /// DecodeBuffer.
+  std::vector<uint8_t> partial_symbol_;
+
+  /// Can we reuse the output buffer or do we need to allocate on each call?
+  bool reuse_buffer_;
+
+  /// Buffer to hold transformed data.
+  uint8_t* out_buffer_ = nullptr;
+
+  /// Length of the output buffer.
+  int64_t buffer_length_ = 0;
+};
+}
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index c72de9099..dc4ff98a0 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -137,6 +137,7 @@ testdata/AllTypesErrorNoNulls/*.txt
 *.avsc
 *.parq
 *.parquet
+testdata/charcodec/*
 testdata/cluster/hive/*.diff
 testdata/cluster/node_templates/cdh5/etc/hadoop/conf/*.xml.tmpl
 testdata/cluster/node_templates/common/etc/kudu/*.conf.tmpl
diff --git a/common/thrift/CatalogObjects.thrift 
b/common/thrift/CatalogObjects.thrift
index 03ebdfdfa..204b6527a 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -365,6 +365,7 @@ struct THdfsStorageDescriptor {
   7: required THdfsFileFormat fileFormat
   8: required i32 blockSize
   9: optional TJsonBinaryFormat jsonBinaryFormat
+  10: optional string encodingValue
 }
 
 // Represents an HDFS partition
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index 3390a9dfa..8a6c20f31 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -500,7 +500,9 @@ error_codes = (
    "cache entry ($0 bytes)"),
 
   ("TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED", 163, "Outstanding tuple 
cache writes "
-   "exceeded the limit ($0 bytes)")
+   "exceeded the limit ($0 bytes)"),
+
+  ("CHARSET_CONVERSION_ERROR", 164, "Error during buffer conversion: $0")
 )
 
 import sys
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index 9191c41f4..1706ae5ae 100644
--- 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.avro.SchemaParseException;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.mr.Catalogs;
@@ -30,14 +31,18 @@ import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.DataSourceTable;
 import org.apache.impala.catalog.FeDataSourceTable;
+import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.local.LocalCatalogException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TCompressionCodec;
@@ -50,12 +55,15 @@ import org.apache.impala.util.AvroSchemaParser;
 import org.apache.impala.util.AvroSchemaUtils;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.MetaStoreUtil;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 
 /**
 * Represents an ALTER TABLE SET [PARTITION ('k1'='a', 'k2'='b'...)]
@@ -134,6 +142,9 @@ public class AlterTableSetTblProperties extends 
AlterTableSetStmt {
 
     // Analyze 'sort.columns' property.
     analyzeSortColumns(getTargetTable(), tblProperties_);
+
+    // Analyze 'serialization.encoding' property
+    analyzeSerializationEncoding(tblProperties_);
   }
 
   private void analyzeKuduTable(Analyzer analyzer) throws AnalysisException {
@@ -267,6 +278,90 @@ public class AlterTableSetTblProperties extends 
AlterTableSetStmt {
     }
   }
 
+  /**
+   * Analyzes the 'serialization.encoding' property in 'tblProperties' to 
check if its
+   * line delimiter is compatible with ASCII, since multi-byte line delimiters 
are not
+   * supported.
+   */
+  public void analyzeSerializationEncoding(Map<String, String> tblProperties)
+        throws AnalysisException {
+    if (!tblProperties.containsKey(serdeConstants.SERIALIZATION_ENCODING)
+        || !getOperation().equals("SET SERDEPROPERTIES")) {
+      return;
+    }
+    FeTable tbl = getTargetTable();
+    if (!(tbl instanceof FeFsTable)) {
+      throw new AnalysisException(
+          String.format("Property 'serialization.encoding' is only supported "
+                  + "on HDFS tables. Conflicting table: %s", 
tbl.getFullName()));
+    }
+
+    if (partitionSet_ != null) {
+      for (FeFsPartition partition: partitionSet_.getPartitions()) {
+        if (partition.getFileFormat() != HdfsFileFormat.TEXT) {
+          throw new AnalysisException(String.format("Property 
'serialization.encoding' "
+                  + "is only supported on TEXT file format.  "
+                  + "Conflicting partition/format: %s %s",
+              partition.getPartitionName(), partition.getFileFormat().name()));
+        }
+      }
+    } else {
+      StorageDescriptor sd = tbl.getMetaStoreTable().getSd();
+      HdfsFileFormat format = HdfsFileFormat.fromHdfsInputFormatClass(
+              sd.getInputFormat(), sd.getSerdeInfo().getSerializationLib());
+      if (format != HdfsFileFormat.TEXT) {
+        throw new AnalysisException(String.format("Property 
'serialization.encoding' "
+                + "is only supported on TEXT file format. Conflicting "
+                + "table/format: %s %s",
+            tbl.getFullName(), format.name()));
+      }
+    }
+
+    String encoding = tblProperties.get(serdeConstants.SERIALIZATION_ENCODING);
+    if (!Charset.isSupported(encoding)) {
+      throw new AnalysisException(String.format("Unsupported encoding: %s.", 
encoding));
+    }
+
+    Charset charset = Charset.forName(encoding);
+    if (partitionSet_ != null) {
+      for (FeFsPartition partition : partitionSet_.getPartitions()) {
+        if (!isLineDelimiterSameAsAscii(
+            partition.getInputFormatDescriptor().getLineDelim(), charset)) {
+          throw new AnalysisException(String.format(
+              "Property 'serialization.encoding' only supports " +
+              "encodings in which line delimiter is compatible with ASCII. " +
+              "Conflicting partition: %s. " +
+              "Please refer to IMPALA-10319 for more info.",
+              partition.getPartitionName()));
+        }
+      }
+    } else {
+      StorageDescriptor sd = tbl.getMetaStoreTable().getSd();
+      HdfsStorageDescriptor hdfsSD;
+      try {
+        hdfsSD = HdfsStorageDescriptor.fromStorageDescriptor(tbl.getName(), 
sd);
+      } catch (HdfsStorageDescriptor.InvalidStorageDescriptorException e) {
+        throw new LocalCatalogException(String.format(
+            "Invalid input format descriptor for table %s", 
table_.getFullName()), e);
+      }
+      if (!isLineDelimiterSameAsAscii(hdfsSD.getLineDelim(), charset)) {
+        throw new AnalysisException(String.format(
+            "Property 'serialization.encoding' only supports " +
+            "encodings in which line delimiter is compatible with ASCII. " +
+            "Conflicting table: %1$s. " +
+            "Please refer to IMPALA-10319 for more info.",
+            table_.getFullName()));
+      }
+    }
+  }
+
+  public static boolean isLineDelimiterSameAsAscii(byte lineDelim, Charset 
charset) {
+    String lineDelimStr = new String(new byte[]{lineDelim}, charset);
+    byte[] newlineBytesInEncoding = lineDelimStr.getBytes(charset);
+    byte[] newlineBytesInAscii = 
lineDelimStr.getBytes(StandardCharsets.US_ASCII);
+    return java.util.Arrays.equals(newlineBytesInEncoding, 
newlineBytesInAscii);
+  }
+
   /**
    * Analyze the 'skip.header.line.count' property to make sure it is set to a 
valid
    * value. It is looked up in 'tblProperties', which must not be null.
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index fbf569120..8fef3f024 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -17,18 +17,21 @@
 
 package org.apache.impala.analysis;
 
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.DataSourceTable;
+import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.RowFormat;
@@ -321,6 +324,8 @@ public class CreateTableStmt extends StatementBase 
implements SingleTableStmt {
     if (BackendConfig.INSTANCE.getComputeLineage() || 
RuntimeEnv.INSTANCE.isTestEnv()) {
        computeLineageGraph(analyzer);
     }
+
+    analyzeSerializationEncoding();
   }
 
   /**
@@ -931,4 +936,43 @@ public class CreateTableStmt extends StatementBase 
implements SingleTableStmt {
     return isExternal() && !Boolean.parseBoolean(getTblProperties().get(
       Table.TBL_PROP_EXTERNAL_TABLE_PURGE));
   }
+
+  /**
+   * Analyzes the 'serialization.encoding' property in 'SerdeProperties' to 
check if its
+   * line delimiter is compatible with ASCII, since multi-byte line delimiters 
are not
+   * supported.
+   */
+  public void analyzeSerializationEncoding()
+        throws AnalysisException {
+    if 
(!getSerdeProperties().containsKey(serdeConstants.SERIALIZATION_ENCODING)) {
+      return;
+    }
+
+    if (getFileFormat() != THdfsFileFormat.TEXT) {
+      throw new AnalysisException(String.format("Property 
'serialization.encoding' is "
+              + "only supported on TEXT file format. Conflicting "
+              + "table/format: %1$s / %2$s",
+          tableDef_.getTblName().toString(), getFileFormat().name()));
+    }
+
+    String encoding = 
getSerdeProperties().get(serdeConstants.SERIALIZATION_ENCODING);
+    if (!Charset.isSupported(encoding)) {
+      throw new AnalysisException("Unsupported encoding: " + encoding + ".");
+    }
+
+    byte lineDelimiter = getRowFormat() == null ?
+        (byte) HdfsStorageDescriptor.DEFAULT_LINE_DELIM :
+        // assuming RowFormat analysis filtered out multi-byte line delimiters
+        (byte) getRowFormat().getLineDelimiter().charAt(0);
+
+    Charset charset = Charset.forName(encoding);
+    if (!AlterTableSetTblProperties.isLineDelimiterSameAsAscii(lineDelimiter, 
charset)) {
+      throw new AnalysisException(
+          String.format("Property 'serialization.encoding' only supports "
+                  + "encodings in which line delimiter is compatible with 
ASCII. "
+                  + "Conflicting table: %1$s. "
+                  + "Please refer to IMPALA-10319 for more info.",
+              tableDef_.getTblName().toString()));
+    }
+  }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
index 6ce5cc871..94579847f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
@@ -95,6 +95,7 @@ public class HdfsStorageDescriptor {
   private final byte quoteChar_;
   private final int blockSize_;
   private final TJsonBinaryFormat jsonBinaryFormat_;
+  private String encodingValue_ = null;
 
   /**
    * Returns a map from delimiter key to a single delimiter character,
@@ -171,7 +172,8 @@ public class HdfsStorageDescriptor {
 
   private HdfsStorageDescriptor(String tblName, HdfsFileFormat fileFormat, 
byte lineDelim,
       byte fieldDelim, byte collectionDelim, byte mapKeyDelim, byte escapeChar,
-      byte quoteChar, int blockSize, TJsonBinaryFormat jsonBinaryFormat) {
+      byte quoteChar, int blockSize, TJsonBinaryFormat jsonBinaryFormat,
+      String encodingValue) {
     this.fileFormat_ = fileFormat;
     this.lineDelim_ = lineDelim;
     this.fieldDelim_ = fieldDelim;
@@ -180,6 +182,7 @@ public class HdfsStorageDescriptor {
     this.quoteChar_ = quoteChar;
     this.blockSize_ = blockSize;
     this.jsonBinaryFormat_ = jsonBinaryFormat;
+    this.encodingValue_ = encodingValue;
 
     // You can set the escape character as a tuple or row delim.  Empirically,
     // this is ignored by hive.
@@ -247,6 +250,8 @@ public class HdfsStorageDescriptor {
       jsonBinaryFormat = null;
     }
 
+    String encodingValue = 
parameters.get(serdeConstants.SERIALIZATION_ENCODING);
+
     try {
       return INTERNER.intern(new HdfsStorageDescriptor(tblName,
           HdfsFileFormat.fromJavaClassName(
@@ -257,7 +262,7 @@ public class HdfsStorageDescriptor {
           delimMap.get(serdeConstants.MAPKEY_DELIM),
           delimMap.get(serdeConstants.ESCAPE_CHAR),
           delimMap.get(serdeConstants.QUOTE_CHAR),
-          blockSize, jsonBinaryFormat));
+          blockSize, jsonBinaryFormat, encodingValue));
     } catch (IllegalArgumentException ex) {
       // Thrown by fromJavaClassName
       throw new InvalidStorageDescriptorException(ex);
@@ -270,19 +275,24 @@ public class HdfsStorageDescriptor {
         HdfsFileFormat.fromThrift(tDesc.getFileFormat()), tDesc.lineDelim,
         tDesc.fieldDelim, tDesc.collectionDelim, tDesc.mapKeyDelim, 
tDesc.escapeChar,
         tDesc.quoteChar, tDesc.blockSize, tDesc.isSetJsonBinaryFormat() ?
-            tDesc.getJsonBinaryFormat() : null));
+            tDesc.getJsonBinaryFormat() : null, tDesc.encodingValue));
   }
 
   public THdfsStorageDescriptor toThrift() {
-    return new THdfsStorageDescriptor(lineDelim_, fieldDelim_, 
collectionDelim_,
-        mapKeyDelim_, escapeChar_, quoteChar_, fileFormat_.toThrift(), 
blockSize_)
-        .setJsonBinaryFormat(jsonBinaryFormat_);
+    THdfsStorageDescriptor tHdfsStorageDescriptor = new THdfsStorageDescriptor(
+        lineDelim_, fieldDelim_, collectionDelim_, mapKeyDelim_, escapeChar_, 
quoteChar_,
+        fileFormat_.toThrift(), blockSize_);
+    if (encodingValue_ != null) {
+      tHdfsStorageDescriptor.setEncodingValue(encodingValue_);
+    }
+    tHdfsStorageDescriptor.setJsonBinaryFormat(jsonBinaryFormat_);
+    return tHdfsStorageDescriptor;
   }
 
   public HdfsStorageDescriptor cloneWithChangedFileFormat(HdfsFileFormat 
newFormat) {
     return INTERNER.intern(new HdfsStorageDescriptor(
         "<unknown>", newFormat, lineDelim_, fieldDelim_, collectionDelim_, 
mapKeyDelim_,
-        escapeChar_, quoteChar_, blockSize_, jsonBinaryFormat_));
+        escapeChar_, quoteChar_, blockSize_, jsonBinaryFormat_, 
encodingValue_));
   }
 
   public byte getLineDelim() { return lineDelim_; }
@@ -293,11 +303,13 @@ public class HdfsStorageDescriptor {
   public HdfsFileFormat getFileFormat() { return fileFormat_; }
   public int getBlockSize() { return blockSize_; }
   public TJsonBinaryFormat getJsonBinaryFormat() { return jsonBinaryFormat_; }
+  public String getEncodingValue() { return encodingValue_; }
 
   @Override
   public int hashCode() {
     return Objects.hash(blockSize_, collectionDelim_, escapeChar_, fieldDelim_,
-        fileFormat_, lineDelim_, mapKeyDelim_, quoteChar_, jsonBinaryFormat_);
+        fileFormat_, lineDelim_, mapKeyDelim_, quoteChar_, jsonBinaryFormat_,
+        encodingValue_);
   }
 
   @Override
@@ -315,6 +327,7 @@ public class HdfsStorageDescriptor {
     if (mapKeyDelim_ != other.mapKeyDelim_) return false;
     if (quoteChar_ != other.quoteChar_) return false;
     if (jsonBinaryFormat_ != other.jsonBinaryFormat_) return false;
+    if (encodingValue_ != other.encodingValue_) return false;
     return true;
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 8c83feb54..47a64bcc5 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1063,6 +1063,64 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // Cannot ALTER TABLE SET on an HBase table.
     AnalysisError("alter table functional_hbase.alltypes set 
tblproperties('a'='b')",
         "ALTER TABLE SET not currently supported on HBase tables.");
+
+    // serialization.encoding
+    AnalyzesOk("alter table functional.alltypes set serdeproperties(" +
+        "'serialization.encoding'='GBK')");
+    AnalyzesOk("alter table functional.text_dollar_hash_pipe set 
serdeproperties(" +
+        "'serialization.encoding'='GBK')");
+    AnalyzesOk("alter table functional.alltypes partition(year=2010, month=12) 
" +
+        "set serdeproperties('serialization.encoding'='GBK')");
+    String tmpTableName =
+        QueryStringBuilder.createTmpTableName("functional", "tmp_table");
+    AnalyzesOk("create table " + tmpTableName + " (id int) with 
serdeproperties(" +
+        "'serialization.encoding'='GBK')");
+
+    String [] unsupportedFileFormatDbs =
+      {"functional_parquet", "functional_rc", "functional_avro"};
+    for (String format: unsupportedFileFormatDbs) {
+      AnalysisError("alter table " + format + ".alltypes set serdeproperties(" 
+
+          "'serialization.encoding'='GBK')", "Property 
'serialization.encoding' is " +
+          "only supported on TEXT file format");
+    }
+    String[] unsupportedFileFormats = {
+        "parquet", "rcfile", "avro", "iceberg"};
+    for (String format: unsupportedFileFormats) {
+      AnalysisError("create table " + tmpTableName + " (id int) with 
serdeproperties(" +
+          "'serialization.encoding'='GBK') stored as " + format,
+          "Property 'serialization.encoding' is only supported on TEXT file 
format");
+    }
+
+    AnalysisError("alter table functional_kudu.alltypes set serdeproperties( " 
+
+        "'serialization.encoding'='GBK')", "Property 'serialization.encoding' 
is only " +
+        "supported on HDFS tables");
+    AnalysisError("alter table functional.alltypesmixedformat 
partition(year=2009) " +
+        "set serdeproperties('serialization.encoding'='GBK')", "Property " +
+        "'serialization.encoding' is only supported on TEXT file format");
+    AnalysisError("create table " + tmpTableName +
+        " (id int, primary key (id)) with serdeproperties(" +
+        "'serialization.encoding'='GBK') stored as kudu " +
+        "tblproperties('kudu.master_addresses'='localhost')",
+        "Property 'serialization.encoding' is only supported on TEXT file 
format");
+
+    AnalysisError("alter table functional.alltypes set serdeproperties(" +
+        "'serialization.encoding'='UTF-16')",
+        "Property 'serialization.encoding' only supports encodings in which 
line " +
+        "delimiter is compatible with ASCII.");
+    AnalysisError("alter table functional.alltypes partition(year=2010, 
month=12) " +
+        "set serdeproperties('serialization.encoding'='UTF-16')",
+        "Property 'serialization.encoding' only supports encodings in which 
line " +
+        "delimiter is compatible with ASCII.");
+    AnalysisError("alter table functional.alltypes set serdeproperties(" +
+        "'serialization.encoding'='NonexistentEncoding')",
+        "Unsupported encoding: NonexistentEncoding.");
+    AnalysisError("create table " + tmpTableName + " (id int) with 
serdeproperties(" +
+        "'serialization.encoding'='UTF-16')",
+        "Property 'serialization.encoding' only supports encodings in which 
line " +
+        "delimiter is compatible with ASCII.");
+    AnalysisError("create table " + tmpTableName + " (id int) with 
serdeproperties(" +
+        "'serialization.encoding'='NonexistentEncoding')",
+        "Unsupported encoding: NonexistentEncoding.");
   }
 
   @Test
diff --git a/testdata/charcodec/cp1251_names.txt 
b/testdata/charcodec/cp1251_names.txt
new file mode 100644
index 000000000..b1c72f772
--- /dev/null
+++ b/testdata/charcodec/cp1251_names.txt
@@ -0,0 +1,3 @@
+�������
+�����
+�������
diff --git a/testdata/charcodec/cp1251_names_utf8.txt 
b/testdata/charcodec/cp1251_names_utf8.txt
new file mode 100644
index 000000000..8fbd22e61
--- /dev/null
+++ b/testdata/charcodec/cp1251_names_utf8.txt
@@ -0,0 +1,3 @@
+Алексей
+Мария
+Дмитрий
diff --git a/testdata/charcodec/gbk_names.txt b/testdata/charcodec/gbk_names.txt
new file mode 100644
index 000000000..349e54469
--- /dev/null
+++ b/testdata/charcodec/gbk_names.txt
@@ -0,0 +1,3 @@
+����
+����
+����
diff --git a/testdata/charcodec/gbk_names_error.txt 
b/testdata/charcodec/gbk_names_error.txt
new file mode 100644
index 000000000..b2d96b6ee
Binary files /dev/null and b/testdata/charcodec/gbk_names_error.txt differ
diff --git a/testdata/charcodec/gbk_names_utf8.txt 
b/testdata/charcodec/gbk_names_utf8.txt
new file mode 100644
index 000000000..f5ac89454
--- /dev/null
+++ b/testdata/charcodec/gbk_names_utf8.txt
@@ -0,0 +1,3 @@
+张三
+李四
+王五
diff --git a/testdata/charcodec/koi8r_names.txt 
b/testdata/charcodec/koi8r_names.txt
new file mode 100644
index 000000000..193c18d58
--- /dev/null
+++ b/testdata/charcodec/koi8r_names.txt
@@ -0,0 +1,3 @@
+�������
+�����
+�������
diff --git a/testdata/charcodec/koi8r_names_utf8.txt 
b/testdata/charcodec/koi8r_names_utf8.txt
new file mode 100644
index 000000000..8fbd22e61
--- /dev/null
+++ b/testdata/charcodec/koi8r_names_utf8.txt
@@ -0,0 +1,3 @@
+Алексей
+Мария
+Дмитрий
diff --git a/testdata/charcodec/latin1_names.txt 
b/testdata/charcodec/latin1_names.txt
new file mode 100644
index 000000000..6b9a76000
--- /dev/null
+++ b/testdata/charcodec/latin1_names.txt
@@ -0,0 +1,3 @@
+Jos�
+Fran�ois
+G�ran
diff --git a/testdata/charcodec/latin1_names_utf8.txt 
b/testdata/charcodec/latin1_names_utf8.txt
new file mode 100644
index 000000000..c30824906
--- /dev/null
+++ b/testdata/charcodec/latin1_names_utf8.txt
@@ -0,0 +1,3 @@
+José
+François
+Göran
diff --git a/testdata/charcodec/shift_jis_names.txt 
b/testdata/charcodec/shift_jis_names.txt
new file mode 100644
index 000000000..cdcb05668
--- /dev/null
+++ b/testdata/charcodec/shift_jis_names.txt
@@ -0,0 +1,3 @@
+�R�c���Y
+�����Ԏq
+��؎��Y
diff --git a/testdata/charcodec/shift_jis_names_error.txt 
b/testdata/charcodec/shift_jis_names_error.txt
new file mode 100644
index 000000000..00a37c73a
Binary files /dev/null and b/testdata/charcodec/shift_jis_names_error.txt differ
diff --git a/testdata/charcodec/shift_jis_names_utf8.txt 
b/testdata/charcodec/shift_jis_names_utf8.txt
new file mode 100644
index 000000000..eca0e90fe
--- /dev/null
+++ b/testdata/charcodec/shift_jis_names_utf8.txt
@@ -0,0 +1,3 @@
+山田太郎
+佐藤花子
+鈴木次郎
diff --git a/tests/query_test/test_charcodec.py 
b/tests/query_test/test_charcodec.py
new file mode 100644
index 000000000..90b4a5df7
--- /dev/null
+++ b/tests/query_test/test_charcodec.py
@@ -0,0 +1,441 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_vector import ImpalaTestDimension
+from tests.common.skip import SkipIfFS
+import codecs
+import os
+import pytest
+import random
+import tempfile
+import shutil
+import sys
+
+if sys.version_info[0] >= 3:
+    unichr = chr  # Python 3
+
+_hiragana_range = [codepoint for codepoint in range(0x3040, 0x309F) if 
codepoint not in
+    # problematic symbols: unassigned, deprecated, etc:
+    set([0x3040, 0x3094, 0x3095, 0x3096, 0x3097, 0x3098, 0x3099, 0x309A, 
0x309B, 0x309C])]
+
+_cyrillic_range = [codepoint for codepoint in range(0x0410, 0x045F) if 
codepoint not in
+    # problematic symbols: unassigned, deprecated, etc:
+    set([0x0450, 0x0452, 0x0453, 0x0454, 0x0455, 0x0456, 0x0457, 0x0458,
+         0x0459, 0x045A, 0x045B, 0x045C, 0x045D, 0x045E])]
+
+_charsets = {
+  'gbk': u''.join(unichr(i) for i in range(0x4E00, 0x9FA6)),
+  'latin1': u''.join(unichr(i) for i in range(0x20, 0x7F)),
+  'shift_jis': u''.join(unichr(i) for i in _hiragana_range),
+  'cp1251': u''.join(unichr(i) for i in range(0x0410, 0x044F)),
+  'koi8-r': u''.join(unichr(i) for i in _cyrillic_range)
+}
+
+
+def _generate_random_word(charset, min_length=1, max_length=20):
+  length = random.randint(min_length, max_length)
+  return u''.join(random.choice(charset) for _ in range(length))
+
+
+def _compare_tables(selfobj, db, utf8_table, encoded_table, row_count):
+    # Compare count(*) of the encoded table with the utf8 table
+    count_utf8 = selfobj.client.execute("""select count(*) from {}.{}"""
+        .format(db, utf8_table))
+    count_encoded = selfobj.client.execute("""select count(*) from {}.{}"""
+        .format(db, encoded_table))
+    assert int(count_utf8.get_data()) == int(count_encoded.get_data()) == 
row_count
+
+    # Compare * of the encoded table with the utf8 table
+    result = selfobj.client.execute("""select * from {}.{} except select * 
from {}.{}
+        union all select * from {}.{} except select * from {}.{}"""
+        .format(db, utf8_table, db, encoded_table, db, encoded_table, db, 
utf8_table))
+    assert result.data == []
+
+
+# Tests with auto-generated data
+class TestCharCodecGen(ImpalaTestSuite):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestCharCodecGen, cls).add_test_dimensions()
+    encodings = list(_charsets.keys())
+    # Only run the tests for single 'gbk' encoding in non-exhaustive mode.
+    if cls.exploration_strategy() != 'exhaustive':
+      encodings = [enc for enc in encodings if enc == 'gbk']
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(
+        'charset', *encodings))
+    # There is no reason to run these tests using all dimensions.
+    # See IMPALA-14063 for Sequence file format support.
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'text'
+        and v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('exec_option')['disable_codegen'] is False)
+
+  # Basic Tests
+  ####################################################################
+  def generate_text_files(self, encoding_name, charset, test_name,
+                          num_lines=10000, words_per_line=5, num_files=1,
+                          min_word_length=1, max_word_length=20):
+    lines_per_file = num_lines // num_files
+    file_paths = []
+    tmp_dir = tempfile.mkdtemp(dir=os.path.join(os.environ['IMPALA_HOME'], 
"testdata"))
+    for file_index in range(num_files):
+      data_file_path = os.path.join(tmp_dir, "charcodec_{}_{}_utf8_{}.txt"
+                                    .format(encoding_name, test_name, 
file_index))
+      file_paths.append(data_file_path)
+      with codecs.open(data_file_path, 'w', encoding='utf-8') as file:
+        for _ in range(lines_per_file):
+          words = [_generate_random_word(charset, min_word_length, 
max_word_length)
+                   for _ in range(words_per_line)]
+          line = u','.join(words)
+          file.write(line + u'\n')
+    return tmp_dir, file_paths, num_lines
+
+  def prepare_utf8_test_table(self, db, file_paths, encoding_name, vector):
+    encoding_name_tbl = encoding_name.replace('-', '')
+    tbl_name = "{}_gen_utf8".format(encoding_name_tbl)
+    self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
+        name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
+        ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"""
+        .format(db, tbl_name))
+    for file_path in file_paths:
+      self.filesystem_client.copy_from_local(file_path,
+          self._get_table_location("{0}.{1}".format(db, tbl_name), vector))
+    # remove REFRESH when IMPALA-13749 is fixed
+    self.execute_query("""REFRESH {}.{}""".format(db, tbl_name))
+    return tbl_name
+
+  def prepare_encoded_test_table(self, db, utf8_table, encoding_name):
+    encoding_name_tbl = encoding_name.replace('-', '')
+    encoded_table = "{}_gen".format(encoding_name_tbl)
+    self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
+        name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
+        STORED AS TEXTFILE""".format(db, encoded_table))
+    self.execute_query("""ALTER TABLE {}.{}
+                       SET SERDEPROPERTIES("serialization.encoding"="{}")"""
+        .format(db, encoded_table, encoding_name))
+    self.execute_query("""REFRESH {}.{}""".format(db, encoded_table))
+    self.execute_query("""INSERT OVERWRITE TABLE {}.{} SELECT * FROM {}.{}"""
+        .format(db, encoded_table, db, utf8_table))
+    return encoded_table
+
+  def test_enc_dec_gen(self, vector, unique_database):
+    """Write encoded table with Impala and read it back."""
+    db = unique_database
+    encoding_name = vector.get_value('charset')
+    charset = _charsets[encoding_name]
+    tmp_dir, file_paths, row_count = self.generate_text_files(
+        encoding_name, charset, "gen")
+    utf8_table = self.prepare_utf8_test_table(db, file_paths, encoding_name, 
vector)
+    shutil.rmtree(tmp_dir)
+    encoded_table = self.prepare_encoded_test_table(db, utf8_table, 
encoding_name)
+    _compare_tables(self, db, utf8_table, encoded_table, row_count)
+
+  def test_enc_dec_gen_long_words(self, vector, unique_database):
+    db = unique_database
+    encoding_name = vector.get_value('charset')
+    charset = _charsets[encoding_name]
+    tmp_dir, file_paths, row_count = self.generate_text_files(
+        encoding_name, charset, "gen", min_word_length=100, 
max_word_length=1000)
+    utf8_table = self.prepare_utf8_test_table(db, file_paths, encoding_name, 
vector)
+    shutil.rmtree(tmp_dir)
+    encoded_table = self.prepare_encoded_test_table(db, utf8_table, 
encoding_name)
+    _compare_tables(self, db, utf8_table, encoded_table, row_count)
+
+  # Split-file tests
+  ####################################################################
+  def test_enc_dec_gen_split(self, vector, unique_database):
+    """Test table is split across multiple files."""
+    db = unique_database
+    encoding_name = vector.get_value('charset')
+    charset = _charsets[encoding_name]
+    tmp_dir, file_paths, row_count = self.generate_text_files(
+        encoding_name, charset, "split", num_lines=10000, words_per_line=5, 
num_files=5)
+    utf8_table = self.prepare_utf8_test_table(db, file_paths, encoding_name, 
vector)
+    shutil.rmtree(tmp_dir)
+    encoded_table = self.prepare_encoded_test_table(db, utf8_table, 
encoding_name)
+    _compare_tables(self, db, utf8_table, encoded_table, row_count)
+
+  # Hive + Compression Tests
+  ####################################################################
+  def prepare_encoded_test_table_compress(self, db, utf8_table, encoding_name, 
codec):
+    encoding_name_tbl = encoding_name.replace('-', '')
+    encoded_table = "{}_gen_{}".format(encoding_name_tbl, codec)
+    self.run_stmt_in_hive("""CREATE TABLE IF NOT EXISTS {}.{} (
+        name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
+        STORED AS TEXTFILE""".format(db, encoded_table))
+    self.run_stmt_in_hive("""ALTER TABLE {}.{}
+                          SET SERDEPROPERTIES("serialization.encoding"="{}")"""
+        .format(db, encoded_table, encoding_name))
+    self.run_stmt_in_hive("""set hive.exec.compress.output={};
+        set mapreduce.output.fileoutputformat.compress.codec=
+        org.apache.hadoop.io.compress.{}Codec;
+        INSERT OVERWRITE TABLE {}.{} SELECT * FROM {}.{}
+        """.format("false" if codec == "None" else "true",
+                   codec, db, encoded_table, db, utf8_table))
+    return encoded_table
+
+  @SkipIfFS.hive
+  def test_enc_dec_gen_compress(self, vector, unique_database):
+    db = unique_database
+    encoding_name = vector.get_value('charset')
+    charset = _charsets[encoding_name]
+
+    tmp_dir, file_paths, row_count = self.generate_text_files(
+        encoding_name, charset, "compress", num_lines=10000)
+    utf8_table = self.prepare_utf8_test_table(db, file_paths, encoding_name, 
vector)
+    shutil.rmtree(tmp_dir)
+    # Snappy codec supports streaming, ZStandard does not
+    for codec in ["None", "Snappy", "ZStandard"]:
+      encoded_table = self.prepare_encoded_test_table_compress(db, utf8_table,
+                                                                encoding_name, 
codec)
+      _compare_tables(self, db, utf8_table, encoded_table, row_count)
+
+  # Partitions Tests
+  ####################################################################
+  def prepare_utf8_test_table_partitions(self, db, file_paths, encoding_name, 
vector):
+    encoding_name_tbl = encoding_name.replace('-', '')
+    tbl_name = "{}_gen_utf8".format(encoding_name_tbl)
+    self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
+        name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
+        PARTITIONED BY (part STRING)
+        ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"""
+        .format(db, tbl_name))
+    for i in range(len(file_paths)):
+      self.execute_query("""ALTER TABLE {}.{} ADD PARTITION (part='{}')"""
+          .format(db, tbl_name, i))
+      part_url = os.path.join(
+          self._get_table_location("{0}.{1}".format(db, tbl_name), vector),
+          "part={}".format(i))
+      part_dir = part_url[part_url.index("/test-warehouse"):]
+      self.filesystem_client.make_dir(part_dir)
+      self.filesystem_client.copy_from_local(file_paths[i], part_dir)
+    self.execute_query("""REFRESH {}.{}""".format(db, tbl_name))
+    return tbl_name
+
+  def prepare_encoded_test_table_partitions(self, db, utf8_table, 
encoding_name,
+                                            file_paths):
+    encoding_name_tbl = encoding_name.replace('-', '')
+    encoded_table = "{}_gen".format(encoding_name_tbl)
+    self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
+        name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
+        PARTITIONED BY (part STRING)
+        STORED AS TEXTFILE""".format(db, encoded_table))
+    for i in range(len(file_paths)):
+      self.execute_query("""ALTER TABLE {}.{} ADD PARTITION (part='{}')"""
+                         .format(db, encoded_table, i))
+      self.execute_query("""ALTER TABLE {}.{} PARTITION (part='{}')
+                        SET SERDEPROPERTIES("serialization.encoding"="{}")"""
+                        .format(db, encoded_table, i, encoding_name))
+      self.execute_query("""REFRESH {}.{}""".format(db, encoded_table))
+      self.execute_query("""INSERT OVERWRITE TABLE {}.{} PARTITION (part='{}')
+          SELECT name1, name2, name3, name4, name5 FROM {}.{} WHERE 
part='{}'"""
+          .format(db, encoded_table, i, db, utf8_table, i))
+    return encoded_table
+
+  def test_enc_dec_gen_partitions(self, vector, unique_database):
+    db = unique_database
+    encoding_name = vector.get_value('charset')
+    charset = _charsets[encoding_name]
+    tmp_dir, file_paths, row_count = self.generate_text_files(
+        encoding_name, charset, "partitions", num_lines=10000, num_files=5)
+    utf8_table = self.prepare_utf8_test_table_partitions(
+        db, file_paths, encoding_name, vector)
+    shutil.rmtree(tmp_dir)
+    encoded_table = self.prepare_encoded_test_table_partitions(db,
+        utf8_table, encoding_name, file_paths)
+    _compare_tables(self, db, utf8_table, encoded_table, row_count)
+
+
+class TestCharCodecGenMixed(ImpalaTestSuite):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestCharCodecGenMixed, cls).add_test_dimensions()
+    # There is no reason to run these tests using all dimensions.
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'text'
+        and v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('exec_option')['disable_codegen'] is False)
+
+  # Mixed Partitions Tests
+  ####################################################################
+  def generate_text_files_mixed(self, test_file, num_lines=10000, 
words_per_line=5,
+                                num_files=1):
+    lines_per_file = num_lines // num_files
+    file_paths = []
+    encodings = []
+    tmp_dir = tempfile.mkdtemp(dir=os.path.join(os.environ['IMPALA_HOME'], 
"testdata"))
+    for i in range(num_files):
+      encoding_name, charset = random.choice(list(_charsets.items()))
+      data_file_path = os.path.join(tmp_dir, "charcodec_{}_{}_utf8_{}.txt"
+                                    .format(encoding_name, test_file, i))
+      encodings.append(encoding_name)
+      file_paths.append(data_file_path)
+      with codecs.open(data_file_path, 'w', encoding='utf-8') as file:
+        for _ in range(lines_per_file):
+          words = [_generate_random_word(charset) for _ in 
range(words_per_line)]
+          line = u','.join(words)
+          file.write(line + u'\n')
+    return tmp_dir, file_paths, encodings, num_lines
+
+  # Partitioned table with different encodings.
+  def prepare_utf8_test_table_partitions_mixed(self, db, file_paths, vector):
+    tbl_name = "mixed_gen_utf8"
+    self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
+        name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
+        PARTITIONED BY (part STRING)
+        ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"""
+        .format(db, tbl_name))
+    for i in range(len(file_paths)):
+      self.execute_query("""ALTER TABLE {}.{} ADD PARTITION (part='{}')"""
+        .format(db, tbl_name, i))
+      part_url = os.path.join(
+          self._get_table_location("{0}.{1}".format(db, tbl_name), vector),
+          "part={}".format(i))
+      part_dir = part_url[part_url.index("/test-warehouse"):]
+      self.filesystem_client.make_dir(part_dir)
+      self.filesystem_client.copy_from_local(file_paths[i], part_dir)
+    self.execute_query("""REFRESH {}.{}""".format(db, tbl_name))
+    return tbl_name
+
+  def prepare_encoded_test_table_partitions_mixed(self, db, utf8_table, 
encodings):
+    encoded_table = "mixed_gen"
+    self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
+        name1 STRING, name2 STRING, name3 STRING, name4 STRING, name5 STRING)
+        PARTITIONED BY (part STRING)
+        STORED AS TEXTFILE""".format(db, encoded_table))
+    for i in range(len(encodings)):
+      self.execute_query("""ALTER TABLE {}.{} ADD PARTITION (part='{}')"""
+                         .format(db, encoded_table, i))
+      self.execute_query("""ALTER TABLE {}.{} PARTITION (part='{}')
+                        SET SERDEPROPERTIES("serialization.encoding"="{}")"""
+                         .format(db, encoded_table, i, encodings[i]))
+      self.execute_query("""REFRESH {}.{}""".format(db, encoded_table))
+      self.execute_query("""INSERT OVERWRITE TABLE {}.{} PARTITION (part='{}')
+          SELECT name1, name2, name3, name4, name5 FROM {}.{} WHERE 
part='{}'"""
+          .format(db, encoded_table, i, db, utf8_table, i))
+    return encoded_table
+
+  def test_enc_dec_gen_partitions_mixed(self, unique_database, vector):
+    db = unique_database
+    tmp_dir, file_paths, encodings, row_count = self.generate_text_files_mixed(
+                                                    "mixed", num_lines=10000, 
num_files=5)
+    utf8_table = self.prepare_utf8_test_table_partitions_mixed(db, file_paths, 
vector)
+    shutil.rmtree(tmp_dir)
+    encoded_table = self.prepare_encoded_test_table_partitions_mixed(db,
+        utf8_table, encodings)
+    _compare_tables(self, db, utf8_table, encoded_table, row_count)
+
+
+class TestCharCodecPreCreated(ImpalaTestSuite):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestCharCodecPreCreated, cls).add_test_dimensions()
+    encodings = list(_charsets.keys())
+    # Only run the tests for single 'gbk' encoding in non-exhaustive mode.
+    if cls.exploration_strategy() != 'exhaustive':
+      encodings = [enc for enc in encodings if enc == 'gbk']
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(
+        'charset', *encodings))
+    # There is no reason to run these tests using all dimensions.
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'text'
+        and v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('exec_option')['disable_codegen'] is False)
+
+  def prepare_test_table(self, vector, db, tbl_name, datafile, encoding=None):
+    tbl_name = tbl_name.replace('-', '')
+    datafile = datafile.replace('-', '')
+    self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (name STRING)
+        STORED AS TEXTFILE""".format(db, tbl_name))
+    if encoding:
+      self.execute_query("""ALTER TABLE {}.{} SET
+          SERDEPROPERTIES("serialization.encoding"="{}")"""
+          .format(db, tbl_name, encoding))
+    data_file_path = os.path.join(os.environ['IMPALA_HOME'], "testdata",
+        "charcodec", datafile)
+    self.filesystem_client.copy_from_local(data_file_path,
+        self._get_table_location("{0}.{1}".format(db, tbl_name), vector))
+    self.execute_query("""REFRESH {}.{}""".format(db, tbl_name))
+    return tbl_name
+
+  def test_precreated_files(self, vector, unique_database):
+    """Read encoded precreated files."""
+    db = unique_database
+    enc = vector.get_value('charset')
+
+    # Without SERDEPROPERTIES("serialization.encoding") data is read 
incorrectly
+    utf8_table = self.prepare_test_table(
+        vector, db, enc + '_names_utf8', enc + '_names_utf8.txt', None)
+    encoded_table = self.prepare_test_table(
+        vector, db, enc + '_names_none', enc + '_names.txt', None)
+    with pytest.raises(AssertionError) as exc_info:
+        _compare_tables(self, db, utf8_table, encoded_table, 3)
+    assert " == []" in str(exc_info.value)
+
+    # With SERDEPROPERTIES("serialization.encoding") data is read correctly
+    encoded_table = self.prepare_test_table(
+        vector, db, enc + '_names', enc + '_names.txt', enc)
+    _compare_tables(self, db, utf8_table, encoded_table, 3)
+
+  def test_precreated_decoding_with_errors(self, vector, unique_database):
+    db = unique_database
+    enc = vector.get_value('charset')
+    # Skip for promiscious encodings
+    if enc not in ['gbk', 'shift_jis']: pytest.skip()
+    encoded_table = self.prepare_test_table(
+        vector, db, enc + '_names_error', enc + '_names_error.txt', enc)
+    err = self.execute_query_expect_failure(
+        self.client, """select * from {}.{}""".format(db, encoded_table))
+    assert "Error during buffer conversion: Conversion failed" in str(err)
+
+  def test_precreated_encoding_with_errors(self, vector, unique_database):
+    db = unique_database
+    enc = vector.get_value('charset')
+    # Skip for promiscious encodings
+    if enc not in ['gbk', 'shift_jis']: pytest.skip()
+    encoded_table = self.prepare_test_table(
+        vector, db, enc + '_names_error', enc + '_names_error.txt', enc)
+    err = self.execute_query_expect_failure(self.client, """insert overwrite 
{}.{}
+        select cast(binary_col as string) from functional.binary_tbl"""
+        .format(db, encoded_table))
+    assert "Error during buffer conversion: Conversion failed" in str(err)
+
+  @SkipIfFS.hive
+  def test_read_from_hive(self, unique_database, vector):
+    """Write table with Impala and read it back with Hive."""
+    db = unique_database
+    enc = vector.get_value('charset')
+
+    utf8_table = self.prepare_test_table(
+        vector, db, enc + '_names_utf8', enc + '_names_utf8.txt', None)
+    encoded_table = self.prepare_test_table(
+        vector, db, enc + '_names', enc + '_names.txt', enc)
+    self.execute_query(
+        """insert overwrite {}.{} select * from {}.{}"""
+        .format(db, encoded_table, db, utf8_table))
+
+    result_hive = self.run_stmt_in_hive(
+        """select name from {}.{}""".format(db, encoded_table))
+    result_impala = self.client.execute(
+        """select name from {}.{}""".format(db, utf8_table))
+    result_hive_list = result_hive.strip().split('\n')[1:]
+    assert result_hive_list == result_impala.data

Reply via email to