This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 3333af5d9 IMPALA-12043: Fix "MaxMessageSize reached" exception in 
Thrift JNI
3333af5d9 is described below

commit 3333af5d97c9367474fca6177a6dd415d747793f
Author: Riza Suminto <[email protected]>
AuthorDate: Thu Apr 6 13:17:14 2023 -0700

    IMPALA-12043: Fix "MaxMessageSize reached" exception in Thrift JNI
    
    Large results from something like COMPUTE INCREMENTAL STATS can result
    in "TTransportException: MaxMessageSize reached". This happens when
    CatalogdMetaProvider.updateCatalogCache() receives a buffer through the
    JNI from NativeGetNextCatalogObjectUpdate that exceeds 100MB.
    TByteBuffer inherits from TEndpointTransport, which in Thrift 0.16.0
    adds a MaxMessageSize limit. TMemoryBuffer adds a constructor that
    allows passing in a TConfiguration object to customize the limit, which
    we make use of in IMPALA-11669, but TByteBuffer does not provide a
    similar interface (THRIFT-5696) and was overlooked.
    
    This patch fix the issue by copying
    org.apache.thrift.transport.TByteBuffer from thrift-0.16.0 to
    org.apache.impala.util.TByteBuffer and patch it with fix from
    THRIFT-5696. It then pass thrift_rpc_max_message_size value as a
    TConfiguration to the TByteBuffer constructor.
    
    Testing:
    - Pass CatalogdMetaProviderTest tests.
    
    Change-Id: I105db332cd312d80bac0313090576bc47064ee16
    Reviewed-on: http://gerrit.cloudera.org:8080/19712
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../java/org/apache/thrift/TConfiguration.java     |  91 ++++++++++++++
 .../thrift/transport/TEndpointTransport.java       |  97 ++++++++++++++
 .../org/apache/impala/catalog/ImpaladCatalog.java  |   9 +-
 .../impala/catalog/local/CatalogdMetaProvider.java |   8 +-
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../java/org/apache/impala/util/TByteBuffer.java   | 140 +++++++++++++++++++++
 .../catalog/local/CatalogdMetaProviderTest.java    |  17 +++
 9 files changed, 366 insertions(+), 4 deletions(-)

diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index cd8f1dc01..e4449575a 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -104,6 +104,7 @@ DECLARE_bool(enable_sync_to_latest_event_on_ddls);
 DECLARE_bool(pull_table_types_and_comments);
 DECLARE_bool(enable_reload_events);
 DECLARE_string(geospatial_library);
+DECLARE_int32(thrift_rpc_max_message_size);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -407,6 +408,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_min_processing_per_thread(FLAGS_min_processing_per_thread);
   cfg.__set_skip_resource_checking_on_last_executor_group_set(
       FLAGS_skip_resource_checking_on_last_executor_group_set);
+  cfg.__set_thrift_rpc_max_message_size(FLAGS_thrift_rpc_max_message_size);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 3f1f03749..ba92d239e 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -250,4 +250,6 @@ struct TBackendGflags {
   109: required i64 min_processing_per_thread
 
   110: required bool skip_resource_checking_on_last_executor_group_set
+
+  111: required i32 thrift_rpc_max_message_size
 }
diff --git 
a/fe/src/compat-apache-hive-3/java/org/apache/thrift/TConfiguration.java 
b/fe/src/compat-apache-hive-3/java/org/apache/thrift/TConfiguration.java
new file mode 100644
index 000000000..9dd039f06
--- /dev/null
+++ b/fe/src/compat-apache-hive-3/java/org/apache/thrift/TConfiguration.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift;
+
+public class TConfiguration {
+  public static final int DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
+  // this value is used consistently across all Thrift libraries
+  public static final int DEFAULT_MAX_FRAME_SIZE = 16384000;
+  public static final int DEFAULT_RECURSION_DEPTH = 64;
+
+  private int maxMessageSize;
+  private int maxFrameSize;
+  private int recursionLimit;
+
+  public TConfiguration() {
+    this(DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_FRAME_SIZE, 
DEFAULT_RECURSION_DEPTH);
+  }
+  public TConfiguration(int maxMessageSize, int maxFrameSize, int 
recursionLimit) {
+    this.maxFrameSize = maxFrameSize;
+    this.maxMessageSize = maxMessageSize;
+    this.recursionLimit = recursionLimit;
+  }
+
+  public int getMaxMessageSize() { return maxMessageSize; }
+
+  public int getMaxFrameSize() { return maxFrameSize; }
+
+  public int getRecursionLimit() { return recursionLimit; }
+
+  public void setMaxMessageSize(int maxMessageSize) {
+    this.maxMessageSize = maxMessageSize;
+  }
+
+  public void setMaxFrameSize(int maxFrameSize) { this.maxFrameSize = 
maxFrameSize; }
+
+  public void setRecursionLimit(int recursionLimit) {
+    this.recursionLimit = recursionLimit;
+  }
+
+  public static final TConfiguration DEFAULT = new Builder().build();
+
+  public static TConfiguration.Builder custom() { return new Builder(); }
+
+  public static class Builder {
+    private int maxMessageSize;
+    private int maxFrameSize;
+    private int recursionLimit;
+
+    Builder() {
+      super();
+      this.maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+      this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
+      this.recursionLimit = DEFAULT_RECURSION_DEPTH;
+    }
+
+    public Builder setMaxMessageSize(int maxMessageSize) {
+      this.maxMessageSize = maxMessageSize;
+      return this;
+    }
+
+    public Builder setMaxFrameSize(int maxFrameSize) {
+      this.maxFrameSize = maxFrameSize;
+      return this;
+    }
+
+    public Builder setRecursionLimit(int recursionLimit) {
+      this.recursionLimit = recursionLimit;
+      return this;
+    }
+
+    public TConfiguration build() {
+      return new TConfiguration(maxMessageSize, maxFrameSize, recursionLimit);
+    }
+  }
+}
diff --git 
a/fe/src/compat-apache-hive-3/java/org/apache/thrift/transport/TEndpointTransport.java
 
b/fe/src/compat-apache-hive-3/java/org/apache/thrift/transport/TEndpointTransport.java
new file mode 100644
index 000000000..c4e6454a4
--- /dev/null
+++ 
b/fe/src/compat-apache-hive-3/java/org/apache/thrift/transport/TEndpointTransport.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import org.apache.thrift.TConfiguration;
+
+import java.util.Objects;
+
+public abstract class TEndpointTransport extends TTransport {
+  protected long getMaxMessageSize() { return 
getConfiguration().getMaxMessageSize(); }
+
+  protected long knownMessageSize;
+  protected long remainingMessageSize;
+
+  private TConfiguration _configuration;
+  public TConfiguration getConfiguration() { return _configuration; }
+
+  public TEndpointTransport(TConfiguration config) throws TTransportException {
+    _configuration = Objects.isNull(config) ? new TConfiguration() : config;
+
+    resetConsumedMessageSize(-1);
+  }
+
+  /**
+   * Resets RemainingMessageSize to the configured maximum
+   * @param newSize
+   */
+  protected void resetConsumedMessageSize(long newSize) throws 
TTransportException {
+    // full reset
+    if (newSize < 0) {
+      knownMessageSize = getMaxMessageSize();
+      remainingMessageSize = getMaxMessageSize();
+      return;
+    }
+
+    // update only: message size can shrink, but not grow
+    if (newSize > knownMessageSize)
+      throw new TTransportException(
+          TTransportException.END_OF_FILE, "MaxMessageSize reached");
+
+    knownMessageSize = newSize;
+    remainingMessageSize = newSize;
+  }
+
+  /**
+   * Updates RemainingMessageSize to reflect then known real message size 
(e.g. framed
+   * transport). Will throw if we already consumed too many bytes or if the 
new size is
+   * larger than allowed.
+   * @param size
+   */
+  public void updateKnownMessageSize(long size) throws TTransportException {
+    long consumed = knownMessageSize - remainingMessageSize;
+    resetConsumedMessageSize(size == 0 ? -1 : size);
+    countConsumedMessageBytes(consumed);
+  }
+
+  /**
+   * Throws if there are not enough bytes in the input stream to satisfy a 
read of
+   * numBytes bytes of data
+   * @param numBytes
+   */
+  public void checkReadBytesAvailable(long numBytes) throws 
TTransportException {
+    if (remainingMessageSize < numBytes)
+      throw new TTransportException(
+          TTransportException.END_OF_FILE, "MaxMessageSize reached");
+  }
+
+  /**
+   * Consumes numBytes from the RemainingMessageSize.
+   * @param numBytes
+   */
+  protected void countConsumedMessageBytes(long numBytes) throws 
TTransportException {
+    if (remainingMessageSize >= numBytes) {
+      remainingMessageSize -= numBytes;
+    } else {
+      remainingMessageSize = 0;
+      throw new TTransportException(
+          TTransportException.END_OF_FILE, "MaxMessageSize reached");
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index bed23effa..965039948 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -33,6 +33,7 @@ import org.apache.impala.authorization.AuthorizationChecker;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TAuthzCacheInvalidation;
 import org.apache.impala.thrift.TCatalogObject;
@@ -48,10 +49,11 @@ import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
 import org.apache.impala.util.PatternMatcher;
+import org.apache.impala.util.TByteBuffer;
 import org.apache.impala.util.TUniqueIdUtil;
+import org.apache.thrift.TConfiguration;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -208,11 +210,14 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
     Map<TableName, PartitionMetaSummary> partUpdates = new HashMap<>();
     long newCatalogVersion = lastSyncedCatalogVersion_.get();
     Pair<Boolean, ByteBuffer> update;
+    int maxMessageSize = BackendConfig.INSTANCE.getThriftRpcMaxMessageSize();
+    final TConfiguration config = new TConfiguration(maxMessageSize,
+        TConfiguration.DEFAULT_MAX_FRAME_SIZE, 
TConfiguration.DEFAULT_RECURSION_DEPTH);
     while ((update = 
FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr))
         != null) {
       boolean isDelete = update.first;
       TCatalogObject obj = new TCatalogObject();
-      obj.read(new TBinaryProtocol(new TByteBuffer(update.second)));
+      obj.read(new TBinaryProtocol(new TByteBuffer(config, update.second)));
       String key = Catalog.toCatalogObjectKey(obj);
       int len = update.second.capacity();
       if (len > 100 * 1024 * 1024 /* 100MB */) {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index fee9311a0..de0f7d653 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -99,12 +99,13 @@ import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.ListMap;
+import org.apache.impala.util.TByteBuffer;
 import org.apache.impala.util.ThreadNameAnnotator;
+import org.apache.thrift.TConfiguration;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TByteBuffer;
 import org.ehcache.sizeof.SizeOf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1223,12 +1224,15 @@ public class CatalogdMetaProvider implements 
MetaProvider {
     ObjectUpdateSequencer hdfsCachePoolSequencer = new ObjectUpdateSequencer();
 
     Pair<Boolean, ByteBuffer> update;
+    int maxMessageSize = BackendConfig.INSTANCE.getThriftRpcMaxMessageSize();
+    final TConfiguration config = new TConfiguration(maxMessageSize,
+        TConfiguration.DEFAULT_MAX_FRAME_SIZE, 
TConfiguration.DEFAULT_RECURSION_DEPTH);
     while ((update = 
FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr))
         != null) {
       boolean isDelete = update.first;
       TCatalogObject obj = new TCatalogObject();
       try {
-        obj.read(new TBinaryProtocol(new TByteBuffer(update.second)));
+        obj.read(new TBinaryProtocol(new TByteBuffer(config, update.second)));
       } catch (TException e) {
         // TODO(todd) include the bad key here! currently the JNI bridge 
doesn't expose
         // the key in any way.
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 820d60a17..4031e8483 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -394,4 +394,8 @@ public class BackendConfig {
   public boolean isSkipResourceCheckingOnLastExecutorGroupSet() {
     return backendCfg_.skip_resource_checking_on_last_executor_group_set;
   }
+
+  public int getThriftRpcMaxMessageSize() {
+    return backendCfg_.thrift_rpc_max_message_size;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/util/TByteBuffer.java 
b/fe/src/main/java/org/apache/impala/util/TByteBuffer.java
new file mode 100644
index 000000000..3295085f5
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/TByteBuffer.java
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TEndpointTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * ByteBuffer-backed implementation of TTransport.
+ * <p>
+ * This is a copy of org.apache.thrift.transport.TByteBuffer that is patched 
with
+ * THRIFT-5696. Once Impala upgrade its IMPALA_THRIFT_POM_VERSION to 
thrift-0.19.0, this
+ * class can be replaced with the patched TByteBuffer from thrift java library.
+ */
+public final class TByteBuffer extends TEndpointTransport {
+  private final ByteBuffer byteBuffer;
+
+  /**
+   * Creates a new TByteBuffer wrapping a given NIO ByteBuffer and custom 
TConfiguration.
+   *
+   * @param configuration the custom TConfiguration.
+   * @param byteBuffer the NIO ByteBuffer to wrap.
+   * @throws TTransportException on error.
+   */
+  public TByteBuffer(TConfiguration configuration, ByteBuffer byteBuffer)
+      throws TTransportException {
+    super(configuration);
+    this.byteBuffer = byteBuffer;
+    updateKnownMessageSize(byteBuffer.capacity());
+  }
+
+  /**
+   * Creates a new TByteBuffer wrapping a given NIO ByteBuffer.
+   *
+   * @param byteBuffer the NIO ByteBuffer to wrap.
+   * @throws TTransportException on error.
+   */
+  public TByteBuffer(ByteBuffer byteBuffer) throws TTransportException {
+    this(new TConfiguration(), byteBuffer);
+  }
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    //
+    checkReadBytesAvailable(len);
+
+    final int n = Math.min(byteBuffer.remaining(), len);
+    if (n > 0) {
+      try {
+        byteBuffer.get(buf, off, n);
+      } catch (BufferUnderflowException e) {
+        throw new TTransportException("Unexpected end of input buffer", e);
+      }
+    }
+    return n;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    try {
+      byteBuffer.put(buf, off, len);
+    } catch (BufferOverflowException e) {
+      throw new TTransportException("Not enough room in output buffer", e);
+    }
+  }
+
+  /**
+   * Gets the underlying NIO ByteBuffer.
+   *
+   * @return the underlying NIO ByteBuffer.
+   */
+  public ByteBuffer getByteBuffer() {
+    return byteBuffer;
+  }
+
+  /**
+   * Convenience method to call clear() on the underlying NIO ByteBuffer.
+   *
+   * @return this instance.
+   */
+  public TByteBuffer clear() {
+    byteBuffer.clear();
+    return this;
+  }
+
+  /**
+   * Convenience method to call flip() on the underlying NIO ByteBuffer.
+   *
+   * @return this instance.
+   */
+  public TByteBuffer flip() {
+    byteBuffer.flip();
+    return this;
+  }
+
+  /**
+   * Convenience method to convert the underlying NIO ByteBuffer to a plain 
old byte
+   * array.
+   *
+   * @return the byte array backing the underlying NIO ByteBuffer.
+   */
+  public byte[] toByteArray() {
+    final byte[] data = new byte[byteBuffer.remaining()];
+    byteBuffer.slice().get(data);
+    return data;
+  }
+}
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index 7eb6f1fbf..dbfb59639 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog.local;
 
 import static org.junit.Assert.*;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -57,6 +58,9 @@ import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TRuntimeProfileNode;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.util.ListMap;
+import org.apache.impala.util.TByteBuffer;
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TTransportException;
 import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
@@ -641,4 +645,17 @@ public class CatalogdMetaProviderTest {
       assertEquals("Actual paths: " + paths, 1, paths.size());
     }
   }
+
+  public void testLargeTConfiguration() throws Exception {
+    // THRIFT-5696: Test that TByteBuffer init pass with large max message 
size beyond
+    // TConfiguration.DEFAULT_MAX_MESSAGE_SIZE.
+    int maxSize = BackendConfig.INSTANCE.getThriftRpcMaxMessageSize();
+    assertEquals(1024 * 1024 * 1024, maxSize);
+    int bufferSize = (100 * 1024 + 512) * 1024;
+    final TConfiguration configLarge = new TConfiguration(maxSize,
+        TConfiguration.DEFAULT_MAX_FRAME_SIZE, 
TConfiguration.DEFAULT_RECURSION_DEPTH);
+    TByteBuffer validTByteBuffer =
+        new TByteBuffer(configLarge, ByteBuffer.allocate(bufferSize));
+    validTByteBuffer.close();
+  }
 }

Reply via email to