This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ff51fbf7 IMPALA-5323: Support BINARY columns in Kudu tables
8ff51fbf7 is described below

commit 8ff51fbf74b4572ce3d1e43389fc10d35a8dd576
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Fri Mar 22 18:56:48 2024 +0100

    IMPALA-5323: Support BINARY columns in Kudu tables
    
    The patch adds read and write support for BINARY columns in Kudu
    tables.
    
    Predicate push down is implemented, but is incomplete:
    a constant binary argument will be only pushed down if
    the constant folding never encounters non-ascii strings.
    Examples:
     - cast(unhex(hex("aa")) as binary) can be pushed down
     - cast(hex(unhex("aa")) as binary) can't be pushed
       down as unhex("aa") is not ascii (even though the
       final result is ascii)
    See IMPALA-10349 for more details on this limitation.
    
    The patch also changes casting BINARY <-> STRING from noop
    to calling an actual function. While this may add some small
    overhead it allows the backend to know whether an expression
    returns STRING or BINARY.
    
    Change-Id: Iff701a4b3a09ce7b6982c5d238e65f3d4f3d1151
    Reviewed-on: http://gerrit.cloudera.org:8080/18868
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/kudu/kudu-util-ir.cc                   | 50 +++++++++++++---------
 be/src/exec/kudu/kudu-util.cc                      |  2 +-
 be/src/exprs/cast-functions-ir.cc                  | 13 +++++-
 be/src/exprs/cast-functions.h                      |  1 +
 be/src/runtime/types.cc                            | 17 +++-----
 be/src/runtime/types.h                             |  9 +++-
 .../java/org/apache/impala/analysis/CastExpr.java  | 47 ++++++++++----------
 .../org/apache/impala/planner/KuduScanNode.java    |  5 +++
 .../main/java/org/apache/impala/util/KuduUtil.java |  3 +-
 .../functional/functional_schema_template.sql      | 28 ++++++++++++
 .../datasets/functional/schema_constraints.csv     |  4 +-
 .../queries/PlannerTest/kudu.test                  | 29 +++++++++++++
 .../queries/QueryTest/binary-type.test             | 35 +++++++++++++++
 tests/common/kudu_test_suite.py                    |  8 +++-
 tests/query_test/test_kudu.py                      | 22 ++--------
 tests/query_test/test_scanners.py                  |  3 +-
 16 files changed, 196 insertions(+), 80 deletions(-)

diff --git a/be/src/exec/kudu/kudu-util-ir.cc b/be/src/exec/kudu/kudu-util-ir.cc
index e78e5a35e..2bd28fa52 100644
--- a/be/src/exec/kudu/kudu-util-ir.cc
+++ b/be/src/exec/kudu/kudu-util-ir.cc
@@ -53,71 +53,81 @@ Status WriteKuduValue(int col, const ColumnType& col_type, 
const void* value,
     bool copy_strings, kudu::KuduPartialRow* row) {
   // TODO: codegen this to eliminate branching on type.
   PrimitiveType type = col_type.type;
+  const char* VALUE_ERROR_MSG = "Could not set Kudu row value.";
   switch (type) {
     case TYPE_VARCHAR: {
       const StringValue* sv = reinterpret_cast<const StringValue*>(value);
       kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->Ptr()), sv->Len());
       if (copy_strings) {
-        KUDU_RETURN_IF_ERROR(row->SetVarchar(col, slice),
-            "Could not set Kudu row value.");
+        KUDU_RETURN_IF_ERROR(row->SetVarchar(col, slice), VALUE_ERROR_MSG);
       } else {
-        KUDU_RETURN_IF_ERROR(row->SetVarcharNoCopyUnsafe(col, slice),
-            "Could not set Kudu row value.");
+        KUDU_RETURN_IF_ERROR(row->SetVarcharNoCopyUnsafe(col, slice), 
VALUE_ERROR_MSG);
       }
       break;
     }
     case TYPE_STRING: {
       const StringValue* sv = reinterpret_cast<const StringValue*>(value);
       kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->Ptr()), sv->Len());
-      if (copy_strings) {
-        KUDU_RETURN_IF_ERROR(row->SetString(col, slice), "Could not set Kudu 
row value.");
+      if (col_type.IsBinaryType()) {
+        if (copy_strings) {
+          KUDU_RETURN_IF_ERROR(
+              row->SetBinary(col, slice), VALUE_ERROR_MSG);
+        } else {
+          KUDU_RETURN_IF_ERROR(
+              row->SetBinaryNoCopy(col, slice), VALUE_ERROR_MSG);
+        }
       } else {
-        KUDU_RETURN_IF_ERROR(
-            row->SetStringNoCopy(col, slice), "Could not set Kudu row value.");
+        if (copy_strings) {
+          KUDU_RETURN_IF_ERROR(
+              row->SetString(col, slice), VALUE_ERROR_MSG);
+        } else {
+          KUDU_RETURN_IF_ERROR(
+              row->SetStringNoCopy(col, slice), VALUE_ERROR_MSG);
+        }
       }
       break;
     }
     case TYPE_FLOAT:
       KUDU_RETURN_IF_ERROR(row->SetFloat(col, *reinterpret_cast<const 
float*>(value)),
-          "Could not set Kudu row value.");
+          VALUE_ERROR_MSG);
       break;
     case TYPE_DOUBLE:
       KUDU_RETURN_IF_ERROR(row->SetDouble(col, *reinterpret_cast<const 
double*>(value)),
-          "Could not set Kudu row value.");
+          VALUE_ERROR_MSG);
       break;
     case TYPE_BOOLEAN:
       KUDU_RETURN_IF_ERROR(row->SetBool(col, *reinterpret_cast<const 
bool*>(value)),
-          "Could not set Kudu row value.");
+          VALUE_ERROR_MSG);
       break;
     case TYPE_TINYINT:
       KUDU_RETURN_IF_ERROR(row->SetInt8(col, *reinterpret_cast<const 
int8_t*>(value)),
-          "Could not set Kudu row value.");
+          VALUE_ERROR_MSG);
       break;
     case TYPE_SMALLINT:
       KUDU_RETURN_IF_ERROR(row->SetInt16(col, *reinterpret_cast<const 
int16_t*>(value)),
-          "Could not set Kudu row value.");
+          VALUE_ERROR_MSG);
       break;
     case TYPE_INT:
       KUDU_RETURN_IF_ERROR(row->SetInt32(col, *reinterpret_cast<const 
int32_t*>(value)),
-          "Could not set Kudu row value.");
+          VALUE_ERROR_MSG);
       break;
     case TYPE_BIGINT:
       KUDU_RETURN_IF_ERROR(row->SetInt64(col, *reinterpret_cast<const 
int64_t*>(value)),
-          "Could not set Kudu row value.");
+          VALUE_ERROR_MSG);
       break;
     case TYPE_TIMESTAMP:
       int64_t ts_micros;
       RETURN_IF_ERROR(ConvertTimestampValueToKudu(
           reinterpret_cast<const TimestampValue*>(value), &ts_micros));
       KUDU_RETURN_IF_ERROR(
-          row->SetUnixTimeMicros(col, ts_micros), "Could not set Kudu row 
value.");
+          row->SetUnixTimeMicros(col, ts_micros), VALUE_ERROR_MSG);
       break;
     case TYPE_DATE:
     {
       int32_t days = 0;
       RETURN_IF_ERROR(ConvertDateValueToKudu(
             reinterpret_cast<const DateValue*>(value), &days));
-      KUDU_RETURN_IF_ERROR(row->SetDate(col, days), "Could not set Kudu row 
value.");
+      KUDU_RETURN_IF_ERROR(row->SetDate(col, days), VALUE_ERROR_MSG);
       break;
     }
     case TYPE_DECIMAL:
@@ -126,19 +136,19 @@ Status WriteKuduValue(int col, const ColumnType& 
col_type, const void* value,
           KUDU_RETURN_IF_ERROR(
               row->SetUnscaledDecimal(
                   col, reinterpret_cast<const Decimal4Value*>(value)->value()),
-              "Could not set Kudu row value.");
+              VALUE_ERROR_MSG);
           break;
         case 8:
           KUDU_RETURN_IF_ERROR(
               row->SetUnscaledDecimal(
                   col, reinterpret_cast<const Decimal8Value*>(value)->value()),
-              "Could not set Kudu row value.");
+              VALUE_ERROR_MSG);
           break;
         case 16:
           KUDU_RETURN_IF_ERROR(
               row->SetUnscaledDecimal(
                   col, reinterpret_cast<const 
Decimal16Value*>(value)->value()),
-              "Could not set Kudu row value.");
+              VALUE_ERROR_MSG);
           break;
         default:
           DCHECK(false) << "Unknown decimal byte size: " << 
col_type.GetByteSize();
diff --git a/be/src/exec/kudu/kudu-util.cc b/be/src/exec/kudu/kudu-util.cc
index 02bf208be..957df97e9 100644
--- a/be/src/exec/kudu/kudu-util.cc
+++ b/be/src/exec/kudu/kudu-util.cc
@@ -150,7 +150,7 @@ ColumnType KuduDataTypeToColumnType(
     case DataType::BOOL: return ColumnType(PrimitiveType::TYPE_BOOLEAN);
     case DataType::FLOAT: return ColumnType(PrimitiveType::TYPE_FLOAT);
     case DataType::DOUBLE: return ColumnType(PrimitiveType::TYPE_DOUBLE);
-    case DataType::BINARY: return ColumnType(PrimitiveType::TYPE_BINARY);
+    case DataType::BINARY: return ColumnType::CreateBinaryType();
     case DataType::UNIXTIME_MICROS: return 
ColumnType(PrimitiveType::TYPE_TIMESTAMP);
     case DataType::DECIMAL:
       return ColumnType::CreateDecimalType(
diff --git a/be/src/exprs/cast-functions-ir.cc 
b/be/src/exprs/cast-functions-ir.cc
index 84246ada5..fa5ce41f2 100644
--- a/be/src/exprs/cast-functions-ir.cc
+++ b/be/src/exprs/cast-functions-ir.cc
@@ -368,7 +368,9 @@ StringVal CastFunctions::CastToStringVal(FunctionContext* 
ctx, const DateVal& va
   return sv;
 }
 
-StringVal CastFunctions::CastToStringVal(FunctionContext* ctx, const 
StringVal& val) {
+// This function handles the following casts:
+// STRING / CHAR(N) / VARCHAR(N) -> VARCHAR(N)
+StringVal CastFunctions::CastToVarchar(FunctionContext* ctx, const StringVal& 
val) {
   if (val.is_null) return StringVal::null();
   StringVal sv;
   sv.ptr = val.ptr;
@@ -377,6 +379,15 @@ StringVal CastFunctions::CastToStringVal(FunctionContext* 
ctx, const StringVal&
   return sv;
 }
 
+// This function handles the following casts:
+// BINARY / CHAR(N) / VARCHAR(N) -> STRING
+// STRING -> BINARY
+// These casts are esentially NOOP, but it is useful to have a function in the
+// expression tree to be able to get the exact return type.
+StringVal CastFunctions::CastToStringVal(FunctionContext* ctx, const 
StringVal& val) {
+  return val;
+}
+
 StringVal CastFunctions::CastToChar(FunctionContext* ctx, const StringVal& 
val) {
   if (val.is_null) return StringVal::null();
 
diff --git a/be/src/exprs/cast-functions.h b/be/src/exprs/cast-functions.h
index 61389e00b..8342d4d78 100644
--- a/be/src/exprs/cast-functions.h
+++ b/be/src/exprs/cast-functions.h
@@ -120,6 +120,7 @@ class CastFunctions {
   static StringVal CastToStringVal(FunctionContext* context, const StringVal& 
val);
 
   static StringVal CastToChar(FunctionContext* context, const StringVal& val);
+  static StringVal CastToVarchar(FunctionContext* context, const StringVal& 
val);
 
   static TimestampVal CastToTimestampVal(FunctionContext* context, const 
BooleanVal& val);
   static TimestampVal CastToTimestampVal(FunctionContext* context, const 
TinyIntVal& val);
diff --git a/be/src/runtime/types.cc b/be/src/runtime/types.cc
index dfe209fbf..535ce97fc 100644
--- a/be/src/runtime/types.cc
+++ b/be/src/runtime/types.cc
@@ -18,9 +18,9 @@
 #include "runtime/types.h"
 
 #include <ostream>
-#include <sstream>
 
 #include "codegen/llvm-codegen.h"
+#include "gutil/strings/substitute.h"
 
 #include "common/names.h"
 
@@ -258,20 +258,17 @@ void ColumnType::ToThrift(TColumnType* thrift_type) const 
{
 }
 
 string ColumnType::DebugString() const {
-  stringstream ss;
   switch (type) {
+    case TYPE_STRING:
+      return is_binary_ ? "BINARY" : "STRING";
     case TYPE_CHAR:
-      ss << "CHAR(" << len << ")";
-      return ss.str();
+      return Substitute("CHAR($0)", len);
     case TYPE_DECIMAL:
-      ss << "DECIMAL(" << precision << "," << scale << ")";
-      return ss.str();
+      return Substitute("DECIMAL($0,$1)", precision, scale);
     case TYPE_VARCHAR:
-      ss << "VARCHAR(" << len << ")";
-      return ss.str();
+      return Substitute("VARCHAR($0)", len);
     case TYPE_FIXED_UDA_INTERMEDIATE:
-      ss << "FIXED_UDA_INTERMEDIATE(" << len << ")";
-      return ss.str();
+      return Substitute("FIXED_UDA_INTERMEDIATE($0)", len);
     default:
       return TypeToString(type);
   }
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 94d8e0258..578dfa819 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -136,6 +136,12 @@ struct ColumnType {
     return ret;
   }
 
+  static ColumnType CreateBinaryType() {
+    ColumnType ret(TYPE_STRING);
+    ret.is_binary_ = true;
+    return ret;
+  }
+
   static bool ValidateDecimalParams(int precision, int scale) {
     return precision >= 1 && precision <= MAX_PRECISION && scale >= 0
         && scale <= MAX_SCALE && scale <= precision;
@@ -175,6 +181,7 @@ struct ColumnType {
     if (children != o.children) return false;
     if (type == TYPE_CHAR || type == TYPE_FIXED_UDA_INTERMEDIATE) return len 
== o.len;
     if (type == TYPE_DECIMAL) return precision == o.precision && scale == 
o.scale;
+    if (type == TYPE_STRING) return is_binary_ == o.is_binary_;
     return true;
   }
 
@@ -265,7 +272,7 @@ struct ColumnType {
   //
   // This variable is true if 'type' is TYPE_STRING and this object represents 
the BINARY
   // type, and false in all other cases.
-  bool  is_binary_ = false;
+  bool is_binary_ = false;
 
   /// Recursive implementation of ToThrift() that populates 'thrift_type' with 
the
   /// TTypeNodes for this type and its children.
diff --git a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java 
b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
index 400d4d399..7de8e074a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
@@ -48,6 +48,8 @@ public class CastExpr extends Expr {
 
   // Prefix for naming cast functions.
   protected final static String CAST_FUNCTION_PREFIX = "castto";
+  private final static String CAST_TO_CHAR_FN = 
"impala::CastFunctions::CastToChar";
+  private final static String CAST_TO_VARCHAR_FN = 
"impala::CastFunctions::CastToVarchar";
 
   // Stores the value of the FORMAT clause.
   private final String castFormat_;
@@ -152,55 +154,63 @@ public class CastExpr extends Expr {
         if (fromType.getPrimitiveType() == PrimitiveType.STRING
             && toType.getPrimitiveType() == PrimitiveType.CHAR) {
           // Allow casting from String to Char(N)
-          String beSymbol = "impala::CastFunctions::CastToChar";
           
db.addBuiltin(ScalarFunction.createBuiltin(getFnName(ScalarType.CHAR),
               Lists.newArrayList((Type) ScalarType.STRING), false, 
ScalarType.CHAR,
-              beSymbol, null, null, true));
+              CAST_TO_CHAR_FN, null, null, true));
           continue;
         }
         if (fromType.getPrimitiveType() == PrimitiveType.CHAR
             && toType.getPrimitiveType() == PrimitiveType.CHAR) {
           // Allow casting from CHAR(N) to Char(N)
-          String beSymbol = "impala::CastFunctions::CastToChar";
           
db.addBuiltin(ScalarFunction.createBuiltin(getFnName(ScalarType.CHAR),
               Lists.newArrayList((Type) ScalarType.createCharType(-1)), false,
-              ScalarType.CHAR, beSymbol, null, null, true));
+              ScalarType.CHAR, CAST_TO_CHAR_FN, null, null, true));
           continue;
         }
         if (fromType.getPrimitiveType() == PrimitiveType.VARCHAR
             && toType.getPrimitiveType() == PrimitiveType.VARCHAR) {
           // Allow casting from VARCHAR(N) to VARCHAR(M)
-          String beSymbol = "impala::CastFunctions::CastToStringVal";
           
db.addBuiltin(ScalarFunction.createBuiltin(getFnName(ScalarType.VARCHAR),
               Lists.newArrayList((Type) ScalarType.VARCHAR), false, 
ScalarType.VARCHAR,
-              beSymbol, null, null, true));
+              CAST_TO_VARCHAR_FN, null, null, true));
           continue;
         }
         if (fromType.getPrimitiveType() == PrimitiveType.VARCHAR
             && toType.getPrimitiveType() == PrimitiveType.CHAR) {
           // Allow casting from VARCHAR(N) to CHAR(M)
-          String beSymbol = "impala::CastFunctions::CastToChar";
           
db.addBuiltin(ScalarFunction.createBuiltin(getFnName(ScalarType.CHAR),
               Lists.newArrayList((Type) ScalarType.VARCHAR), false, 
ScalarType.CHAR,
-              beSymbol, null, null, true));
+              CAST_TO_CHAR_FN, null, null, true));
           continue;
         }
         if (fromType.getPrimitiveType() == PrimitiveType.CHAR
             && toType.getPrimitiveType() == PrimitiveType.VARCHAR) {
           // Allow casting from CHAR(N) to VARCHAR(M)
-          String beSymbol = "impala::CastFunctions::CastToStringVal";
           
db.addBuiltin(ScalarFunction.createBuiltin(getFnName(ScalarType.VARCHAR),
               Lists.newArrayList((Type) ScalarType.CHAR), false, 
ScalarType.VARCHAR,
-              beSymbol, null, null, true));
+              CAST_TO_VARCHAR_FN, null, null, true));
           continue;
         }
-        // Disable no-op casts
-        if (fromType.equals(toType) && !fromType.isDecimal()) continue;
-        // No built-in function needed for BINARY <-> STRING conversion, while 
there is
-        // no conversion from / to any other type.
-        if (fromType.isBinary() || toType.isBinary()) {
+       if (fromType.getPrimitiveType() == PrimitiveType.STRING
+            && toType.getPrimitiveType() == PrimitiveType.VARCHAR) {
+          // Allow casting from STRING to VARCHAR(M)
+          
db.addBuiltin(ScalarFunction.createBuiltin(getFnName(ScalarType.VARCHAR),
+              Lists.newArrayList((Type) ScalarType.STRING), false, 
ScalarType.VARCHAR,
+              CAST_TO_VARCHAR_FN, null, null, true));
+          continue;
+        }
+        // Disable binary<->non-string casts.
+        // TODO(IMPALA-7998): invalid cases could be identified in ScalarType's
+        //                    compatibility matrix
+        if (fromType.isBinary() && !toType.isString()) {
+          continue;
+        }
+        if (toType.isBinary() && !fromType.isString()) {
           continue;
         }
+        // Disable no-op casts
+        if (fromType.equals(toType) && !fromType.isDecimal()) continue;
+
         String beClass = toType.isDecimal() || fromType.isDecimal() ?
             "DecimalOperators" : "CastFunctions";
         String beSymbol = "impala::" + beClass + "::CastTo" + 
Function.getUdfType(toType);
@@ -390,13 +400,6 @@ public class CastExpr extends Expr {
     noOp_ = childType.equals(type_);
     if (noOp_) return;
 
-    // BINARY can be only converted from / to STRING and the conversion is 
NOOP.
-    if ((childType.isBinary() && type_.getPrimitiveType() == 
PrimitiveType.STRING)
-        || (type_.isBinary() && childType.getPrimitiveType() == 
PrimitiveType.STRING)) {
-      noOp_ = true;
-      return;
-    }
-
     FunctionName fnName = new FunctionName(BuiltinsDb.NAME, getFnName(type_));
     Type[] args = { childType };
     Function searchDesc = new Function(fnName, args, Type.INVALID, false);
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 2ae581525..1ac0eea87 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -580,6 +580,11 @@ public class KuduScanNode extends ScanNode {
             ((StringLiteral)literal).getUnescapedValue());
         break;
       }
+      case BINARY: {
+        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
+            ((StringLiteral)literal).getUnescapedValue().getBytes());
+        break;
+      }
       case TIMESTAMP: {
         try {
           // TODO: Simplify when Impala supports a 64-bit TIMESTAMP type.
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java 
b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 74940df45..ee873a735 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -442,10 +442,10 @@ public class KuduUtil {
       case DECIMAL: return org.apache.kudu.Type.DECIMAL;
       case DATE: return org.apache.kudu.Type.DATE;
       case VARCHAR: return org.apache.kudu.Type.VARCHAR;
+      case BINARY: return org.apache.kudu.Type.BINARY;
       /* Fall through below */
       case INVALID_TYPE:
       case NULL_TYPE:
-      case BINARY:
       case DATETIME:
       case CHAR:
       default:
@@ -471,6 +471,7 @@ public class KuduUtil {
         return ScalarType.createDecimalType(
             typeAttributes.getPrecision(), typeAttributes.getScale());
       case VARCHAR: return 
ScalarType.createVarcharType(typeAttributes.getLength());
+      case BINARY: return Type.BINARY;
       default:
         throw new ImpalaRuntimeException(String.format(
             "Kudu type '%s' is not supported in Impala", t.getName()));
diff --git a/testdata/datasets/functional/functional_schema_template.sql 
b/testdata/datasets/functional/functional_schema_template.sql
index abf8b44ad..8567bb94f 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -4362,6 +4362,17 @@ LOAD DATA LOCAL INPATH 
'{impala_home}/testdata/data/binary_tbl/000000_0.txt' OVE
 ---- DEPENDENT_LOAD
 insert overwrite table {db_name}{db_suffix}.{table_name}
 select id, string_col, binary_col from functional.{table_name};
+---- CREATE_KUDU
+DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name};
+CREATE TABLE {db_name}{db_suffix}.{table_name} (
+  id INT PRIMARY KEY,
+  string_col STRING,
+  binary_col BINARY
+)
+PARTITION BY HASH (id) PARTITIONS 3 STORED AS KUDU;
+---- DEPENDENT_LOAD_KUDU
+insert into table {db_name}{db_suffix}.{table_name}
+select id, string_col, binary_col from functional.{table_name};
 ====
 ---- DATASET
 functional
@@ -4385,6 +4396,23 @@ select id, int_col, cast(string_col as binary),
     from functional.alltypes;
 ---- DEPENDENT_LOAD
 insert overwrite table {db_name}{db_suffix}.{table_name} partition(year, month)
+select id, int_col, cast(string_col as binary),
+       cast(case when id % 2 = 0 then date_string_col else NULL end as binary),
+       year, month
+    from functional.alltypes;
+---- CREATE_KUDU
+DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name};
+CREATE TABLE {db_name}{db_suffix}.{table_name} (
+  id INT PRIMARY KEY,
+  int_col INT,
+  binary_col BINARY,
+  binary_col_with_nulls BINARY,
+  year INT,
+  month INT
+)
+PARTITION BY HASH (id) PARTITIONS 3 STORED AS KUDU;
+---- DEPENDENT_LOAD_KUDU
+insert into table {db_name}{db_suffix}.{table_name}
 select id, int_col, cast(string_col as binary),
        cast(case when id % 2 = 0 then date_string_col else NULL end as binary),
        year, month
diff --git a/testdata/datasets/functional/schema_constraints.csv 
b/testdata/datasets/functional/schema_constraints.csv
index 7c78757ee..da430313a 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -293,6 +293,8 @@ table_name:strings_with_quotes, constraint:only, 
table_format:kudu/none/none
 table_name:manynulls, constraint:only, table_format:kudu/none/none
 table_name:date_tbl, constraint:only, table_format:kudu/none/none
 table_name:timestamp_at_dst_changes, constraint:only, 
table_format:kudu/none/none
+table_name:binary_tbl, constraint:only, table_format:kudu/none/none
+table_name:binary_tbl_big, constraint:only, table_format:kudu/none/none
 
 # Skipping header lines is only effective with text tables
 table_name:table_with_header, constraint:restrict_to, 
table_format:text/none/none
@@ -329,8 +331,6 @@ table_name:date_tbl_error, constraint:restrict_to, 
table_format:text/snap/block
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/def/block
 table_name:insert_date_tbl, constraint:restrict_to, 
table_format:hbase/none/none
 
-table_name:binary_tbl, constraint:exclude, table_format:kudu/none/none
-table_name:binary_tbl_big, constraint:exclude, table_format:kudu/none/none
 table_name:binary_in_complex_types, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:binary_in_complex_types, constraint:restrict_to, 
table_format:orc/def/block
 
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 69fb70ce1..eefbe5bcc 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -750,4 +750,33 @@ PLAN-ROOT SINK
 00:SCAN KUDU [functional_kudu.alltypes]
    kudu predicates: double_col = 1.0
    row-size=97B cardinality=730
+====
+# BINARY predicate.
+select * from functional_kudu.binary_tbl where binary_col=cast("a" as binary);
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN KUDU [functional_kudu.binary_tbl]
+   kudu predicates: binary_col = 'a'
+   row-size=36B cardinality=0
+====
+# BINARY predicate.
+# Non-ASCII strings cannot be pushed down to Kudu (IMPALA-10349)
+select * from functional_kudu.binary_tbl where binary_col=cast("á" as binary);
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN KUDU [functional_kudu.binary_tbl]
+   predicates: binary_col = CAST('á' AS BINARY)
+   row-size=36B cardinality=0
+====
+# BINARY predicate.
+# Not valid utf8 strings cannot be pushed down to Kudu (IMPALA-10349)
+select * from functional_kudu.binary_tbl where binary_col=cast(unhex("aa") as 
binary);
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN KUDU [functional_kudu.binary_tbl]
+   predicates: binary_col = CAST(unhex('aa') AS BINARY)
+   row-size=36B cardinality=0
 ====
\ No newline at end of file
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/binary-type.test 
b/testdata/workloads/functional-query/queries/QueryTest/binary-type.test
index 8d8941d9d..f7b278644 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/binary-type.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/binary-type.test
@@ -155,3 +155,38 @@ INT,  INT, BINARY, BINARY
 2,2,'01/01/09','01/01/09'
 2,0,'01/01/09','01/01/09'
 ====
+---- QUERY
+# Check that simple filters on binary columns can be pushed down to Kudu.
+select count(*) from binary_tbl
+where binary_col = cast("not_in_table" as binary)
+---- TYPES
+BIGINT
+---- RESULTS
+0
+---- RUNTIME_PROFILE: table_format=kudu
+!row_regex:.*TotalKuduScanRoundTrips: 1.*
+====
+---- QUERY
+# Check that filters with constant folding and non ascii characters cannot
+# be pushed down to kudu (IMPALA-10349).
+select count(*) from binary_tbl
+where binary_col = cast("á" as binary)
+---- TYPES
+BIGINT
+---- RESULTS
+0
+---- RUNTIME_PROFILE: table_format=kudu
+row_regex:.*TotalKuduScanRoundTrips: 1.*
+====
+---- QUERY
+# Check that filters with constant folding and not valid utf8 characters cannot
+# be pushed down to kudu (IMPALA-10349).
+select count(*) from binary_tbl
+where binary_col = cast(unhex("AA") as binary)
+---- TYPES
+BIGINT
+---- RESULTS
+0
+---- RUNTIME_PROFILE: table_format=kudu
+row_regex:.*TotalKuduScanRoundTrips: 1.*
+====
diff --git a/tests/common/kudu_test_suite.py b/tests/common/kudu_test_suite.py
index 74791b0a9..f9cf36f56 100644
--- a/tests/common/kudu_test_suite.py
+++ b/tests/common/kudu_test_suite.py
@@ -33,7 +33,8 @@ from kudu.schema import (
     SchemaBuilder,
     STRING,
     BINARY,
-    UNIXTIME_MICROS)
+    UNIXTIME_MICROS,
+    DATE)
 from kudu.client import Partitioning
 from random import choice, sample
 from string import ascii_lowercase, digits
@@ -188,7 +189,10 @@ class KuduTestSuite(ImpalaTestSuite):
         INT32: "INT",
         INT64: "BIGINT",
         INT8: "TINYINT",
-        STRING: "STRING"}
+        STRING: "STRING",
+        BINARY: "BINARY",
+        UNIXTIME_MICROS: "TIMESTAMP",
+        DATE: "DATE"}
     if col_type not in mapping:
       raise Exception("Unexpected column type: %s" % col_type)
     return mapping[col_type]
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index ca69a8dda..4869e0450 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -30,7 +30,8 @@ from kudu.schema import (
     SchemaBuilder,
     STRING,
     BINARY,
-    UNIXTIME_MICROS)
+    UNIXTIME_MICROS,
+    DATE)
 from kudu.client import Partitioning
 from kudu.util import to_unixtime_micros
 import logging
@@ -732,7 +733,8 @@ class TestCreateExternalTable(KuduTestSuite):
   def test_col_types(self, cursor, kudu_client):
     """Check that a table can be created using all available column types."""
     # TODO: Add DECIMAL when the Kudu python client supports decimal
-    kudu_types = [STRING, BOOL, DOUBLE, FLOAT, INT16, INT32, INT64, INT8]
+    kudu_types = [STRING, BOOL, DOUBLE, FLOAT, INT16, INT32, INT64, INT8, 
BINARY,
+                  UNIXTIME_MICROS, DATE]
     with self.temp_kudu_table(kudu_client, kudu_types) as kudu_table:
       impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
       props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
@@ -747,22 +749,6 @@ class TestCreateExternalTable(KuduTestSuite):
           assert col_type.upper() == \
               self.kudu_col_type_to_impala_col_type(kudu_col.type.type)
 
-  @SkipIfKudu.hms_integration_enabled()
-  def test_unsupported_binary_col(self, cursor, kudu_client):
-    """Check that external tables with BINARY columns fail gracefully.
-    """
-    with self.temp_kudu_table(kudu_client, [INT32, BINARY]) as kudu_table:
-      impala_table_name = self.random_table_name()
-      try:
-        cursor.execute("""
-            CREATE EXTERNAL TABLE %s
-            STORED AS KUDU
-            TBLPROPERTIES('kudu.table_name' = '%s')""" % (impala_table_name,
-                kudu_table.name))
-        assert False
-      except Exception as e:
-        assert "Kudu type 'binary' is not supported in Impala" in str(e)
-
   @SkipIfKudu.hms_integration_enabled()
   def test_drop_external_table(self, cursor, kudu_client):
     """Check that dropping an external table only affects the catalog and does 
not delete
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 6b2bb4fbe..30602e9b0 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1995,9 +1995,8 @@ class TestBinaryType(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestBinaryType, cls).add_test_dimensions()
-    # todo: IMPALA-5323: Support Kudu BINARY
     cls.ImpalaTestMatrix.add_constraint(
-        lambda v: v.get_value('table_format').file_format not in ['kudu', 
'json'])
+      lambda v: v.get_value('table_format').file_format != 'json')
 
   def test_binary_type(self, vector):
     self.run_test_case('QueryTest/binary-type', vector)

Reply via email to