[ 
https://issues.apache.org/jira/browse/HIVE-27186?focusedWorklogId=860988&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-860988
 ]

ASF GitHub Bot logged work on HIVE-27186:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/May/23 10:35
            Start Date: 08/May/23 10:35
    Worklog Time Spent: 10m 
      Work Description: dengzhhu653 commented on code in PR #4194:
URL: https://github.com/apache/hive/pull/4194#discussion_r1187292000


##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/properties/SerializationProxy.java:
##########
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Executable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static 
org.apache.hadoop.hive.metastore.properties.Serializer.SERIALIZER;
+
+/**
+ * The serialization proxy template.
+ * <p>
+ * This allows a class that defines final members to be made serializable in 
an easy way.
+ * The class <em>must</em> implement:
+ * <ul>
+ * <li>a constructor that takes a DataInput (or derived class) as 
parameter</li>
+ * <li>a write method that takes a DataOutput (or derived class) as 
parameter</li>
+ * </ul>
+ * <p>
+ *   One should consider the constructor as being potentially fed with an 
invalid stream so
+ *   all usual checks of a public constructor should apply.
+ * </p>
+ * Standard usage is to add the Serializable interface implementation through 
the following 2 methods:
+ * <code>
+ * private Object writeReplace() throws ObjectStreamException {
+ *     return new SerializationProxy&lt;TheClass&gt;(this);
+ * }
+ * private void readObject(ObjectInputStream in)throws 
IOException,ClassNotFoundException{
+ *     throw new InvalidObjectException("proxy required");
+ * }
+ * </code>
+ * @param <T> the serializable object type
+ */
+public class SerializationProxy<T extends Serializable> implements 
Externalizable {
+  /** Serial version. */
+  private static final long serialVersionUID = 202212281757L;
+  /** The logger. */
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(SerializationProxy.class);
+  /** The map of class names to types. */
+  private static final ConcurrentMap<String, Type<?>> TYPES = new 
ConcurrentHashMap<>();
+  /** The list of registered pre-defined classes. */
+  private static final List<Type<?>> REGISTERED = new ArrayList<>();
+  /** A thread local context used for arguments passing during 
serialization/de-serialization. */
+  private static final ThreadLocal<Object[]> EXTRA_ARGUMENTS = new 
ThreadLocal<>();
+
+  /** The type of instance being read or written. */
+  private transient Type<T> type = null;
+  /** The instance being read or written. */
+  private transient T proxied = null;
+
+  /**
+   * Wraps any error that may occur whilst using reflective calls.
+   */
+  public static class ProxyException extends RuntimeException {
+    public ProxyException(Throwable cause) {
+      super(cause);
+    }
+
+    public ProxyException(String msg) {
+      super(msg);
+    }
+
+    /**
+     * Convert an exception to a VDBRuntimeException.
+     * @param cause the exception to convert
+     * @return the wrapping CubeException
+     */
+    public static ProxyException convert(Throwable cause) {
+      if (cause instanceof ProxyException) {
+        return (ProxyException) cause;
+      } else {
+        return new ProxyException(cause);
+      }
+    }
+  }
+
+  /**
+   * Constructor called from proxify.writeReplace().
+   * @param proxify the instance to proxy
+   */
+  @SuppressWarnings("unchecked")
+  public SerializationProxy(T proxify) {
+    Class<T> clazz = (Class<T>) proxify.getClass();
+    type = (Type<T>) TYPES.computeIfAbsent(clazz.getName(), this::createType);
+    proxied = proxify;
+  }
+
+  /**
+   * Default constructor.
+   */
+  public SerializationProxy() {
+    // do nothing
+  }
+
+  /**
+   * Sets the extra-arguments as a thread local context.
+   * <p>Used to pass extra arguments o constructors/write methods.</p>
+   * @param o the arguments
+   */
+  public static void setExtraArguments(Object[] o) {
+    if (null == o) {
+      EXTRA_ARGUMENTS.remove();
+    } else {
+      EXTRA_ARGUMENTS.set(o);
+    }
+  }
+
+  /**
+   * Gets the extra-arguments to ctor/write executable stored in a thread 
local context.
+   * @return the arguments
+   */
+  public static Object[] getExtraArguments() {
+    return EXTRA_ARGUMENTS.get();
+  }
+
+  /**
+   * Swaps the thread local context.
+   * <p>This may be used to stack up contexts during cascading calls.</p>
+   * @param newArgs the new arguments
+   * @return the down-stack caller arguments
+   */
+  public static Object[] swapExtraArguments(Object[] newArgs) {
+    Object[] previous = EXTRA_ARGUMENTS.get();
+    setExtraArguments(newArgs);
+    return previous;
+  }
+
+  /**
+   * Unloads the proxy.
+   */
+  public static void unload() {
+    EXTRA_ARGUMENTS.remove();
+    TYPES.clear();
+  }
+
+  /**
+   * Registers a pre-defined class (known to be used throughout the whole 
application).
+   * @param <T> the type
+   * @param slot the slot number
+   * @param clazz the class
+   */
+  public static <T extends Serializable> void registerType(final int slot, 
Class<T> clazz) {
+    synchronized (REGISTERED) {
+      Type<T> ntype = new Type<>(clazz);
+      ntype.slot = slot;
+      if (slot >= 255) {
+        throw new IllegalArgumentException(ntype + "@" + slot + ": can not 
register more than 254 types");
+      }
+      List<Type<?>> types = REGISTERED;
+      while (types.size() <= slot) {
+        types.add(null);
+      }
+      if (types.get(slot) != null) {
+        throw new IllegalArgumentException(ntype + "@" + slot + ": slot 
already used by " + types.get(slot));
+      }
+      types.set(slot, ntype);
+      TYPES.put(clazz.getName(), ntype);
+    }
+  }
+
+  /**
+   * Called by serialization after readExternal.
+   * @return the proxied instance
+   * @throws IOException for signature compliance
+   */
+  public Object readResolve() throws IOException {
+    return proxied;
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException {
+    long serial = in.readLong();
+    if (serial != serialVersionUID) {
+      throw new ProxyException("invalid serial version, got " + serial +", 
expected " + serialVersionUID);
+    }
+    type = readType(in);
+    proxied = type.proxyNew(in);
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    out.writeLong(serialVersionUID);
+    writeType(type, out);
+    type.proxyWrite(proxied, out);
+  }
+
+  /**
+   * Converts a serializable object to an array of bytes.
+   * @param serializable the object to serialize
+   * @param args the proxy arguments
+   * @return the array of bytes
+   * @throws ProxyException on any underlying error
+   */
+  public static byte[] toBytes(Serializable serializable, Object... args) {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(512);
+    final Object[] stack = SerializationProxy.swapExtraArguments(args);
+    try (ObjectOutput oos = new ObjectOutputStream(bos)) {
+      oos.writeObject(serializable);
+      oos.flush();
+      return bos.toByteArray();
+    } catch (IOException xany) {
+      throw ProxyException.convert(xany);
+    } finally {
+      SerializationProxy.swapExtraArguments(stack);
+    }
+  }
+
+  /**
+   * Materialize a serializable object from an array of bytes.
+   * @param bytes the bytes
+   * @param args the proxy arguments
+   * @return the object
+   * @throws ProxyException on any underlying error
+   */
+  public static <T extends Serializable> T fromBytes(byte[] bytes, Object... 
args) {
+    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+    final Object[] stack = SerializationProxy.swapExtraArguments(args);
+    try (ObjectInput ois = new ObjectInputStream(bis)) {
+      return (T) ois.readObject();
+    } catch (IOException | ClassNotFoundException | ClassCastException xany) {
+      throw ProxyException.convert(xany);
+    } finally {
+      SerializationProxy.swapExtraArguments(stack);
+    }
+  }
+
+  /**
+   * Saves an object to persistent storage.
+   * @param file the file to write to
+   * @param persist the object to serialize
+   * @param args the proxy constructor arguments
+   * @return true if successful, false if file is null
+   * @throws ProxyException in case of low level error
+   */
+  public static boolean write(File file, Serializable persist, Object... args) 
{
+    return SERIALIZER.write(file, persist, args);
+  }
+
+  /**
+   * Saves an object to persistent storage.
+   * @param out the stream to write to
+   * @param persist the object to serialize
+   * @param args the proxy write method arguments
+   * @return true if successful, false if file is null
+   * @throws ProxyException in case of low level error
+   */
+  public static boolean write(OutputStream out, Serializable persist, 
Object... args) {
+    return SERIALIZER.write(out, persist, args);
+  }
+
+  /**
+   * Loads an object from the persistent storage.
+   * @param file the file to read from
+   * @param args the proxy arguments
+   * @return the object or null if file is null
+   * @throws ProxyException in case of low level error
+   */
+  public static Serializable read(File file, Object... args) {
+    return SERIALIZER.read(file, args);
+  }
+
+  /**
+   * Loads an object from the persistent storage.
+   * @param in the stream to read from
+   * @param args the proxy arguments
+   * @return the object or null if file is null
+   * @throws ProxyException in case of low level error
+   */
+  public static <T extends Serializable> T read(InputStream in, Object... 
args) {
+    return SERIALIZER.read(in, args);
+  }
+
+  /**
+   * Creates a Type using a class name.
+   * @param cname the class name
+   * @return a type instance
+   * @throws ProxyException on any underlying error
+   */
+   protected Type<T> createType(String cname) {
+    try {
+      @SuppressWarnings("unchecked")
+      Class<T> clazz = (Class<T>) Class.forName(cname);
+      return new Type<>(clazz);
+    } catch (ClassNotFoundException xnotfound) {
+      throw ProxyException.convert(xnotfound);
+    }
+  }
+
+  /**
+   * When writing out this instance, write down the canonical class name it 
proxifies.
+   * @param out the output
+   * @throws IOException if things go wrong
+   */
+  protected void writeType(Type<?> type, DataOutput out) throws IOException {
+    int slot = type.getSlot();
+    out.write(slot);
+    if (slot == 255) {
+      out.writeUTF(type.getTargetName());
+    }
+  }
+
+  /**
+   * When reading an instance, fetch the type through the canonical class name 
that was persisted.
+   * @param in the input
+   * @throws IOException on read error
+   * @throws ProxyException if class was expected to be registered but can not 
be found
+   */
+  @SuppressWarnings("unchecked")
+  protected Type<T> readType(DataInput in) throws IOException {
+    final Type<T> type;
+    String className = "?";
+    int slot = (int) in.readByte() & 0xff;
+    if (slot == 255) {
+      className = in.readUTF();
+      type = (Type<T>) TYPES.computeIfAbsent(className, this::createType);
+    } else if (slot < REGISTERED.size()) {
+      type = (Type<T>) REGISTERED.get(slot);
+    } else {
+      type = null;
+    }
+    if (type == null) {
+      throw new ProxyException("can not resolve class @ " + slot +", " + 
className);
+    }
+    return type;
+  }
+
+  /**
+   * Encapsulates the mandatory constructor and write methods for a given 
proxified class.
+   * @param <T> the proxified class
+   */
+  protected static class Type<T extends Serializable> {
+    private final Constructor<T>[] ctors;
+    private final Method[] writes;
+    private transient int slot = 255;
+
+    /**
+     * Creates a new instance of type.
+     * @param clazz the proxified class
+     */
+    public Type(Class<T> clazz) {
+        ctors = typeConstructors(clazz);
+        writes = typeWrites(clazz);
+    }
+
+    /**
+     * The slot number if the class is registered.
+     * @return the slot number, 255 means not-registered
+     */
+    public int getSlot() {
+      return slot;
+    }
+
+    /**
+     * @return the target class
+     */
+    public String getTargetName() {
+      // there is always at least one ctor
+      return ctors[0].getDeclaringClass().getName();
+    }
+
+    /**
+     * Compare parameter signatures of executables.
+     * @param lhs left-hand side
+     * @param rhs right-hand side
+     * @return 0 if equal, +/- 1 if left &lt;/&gt; than right
+     */
+    private static int compareSignatures(Executable lhs, Executable rhs) {
+      return compareSignatures(lhs.getParameterTypes(), 
rhs.getParameterTypes());
+    }
+
+    /**
+     * Compare executables parameter signatures.
+     * @param lhs left-hand side executable
+     * @param rhs right-hand side executable
+     * @return 0 if equal, +/- 1 if left &lt;/&gt; than right
+     */
+    private static int compareSignatures(Class<?>[] lhs, Class<?>[] rhs) {
+      if (lhs.length < rhs.length) {
+        return -1;
+      }
+      if (lhs.length > rhs.length) {
+        return 1;
+      }
+      int cmp = 0;
+      // lhs.length == rhs.length
+      final int length = lhs.length;
+      for (int p = 0; p < length; ++p) {
+        Class<?> actual = lhs[p];
+        Class<?> formal = rhs[p];
+        if (formal != null && actual != null && 
!formal.isAssignableFrom(actual)) {
+          // if formal parameter is primitive and actual argument is compatible
+          int dist;
+          if (formal.isPrimitive() && (dist = 
CONVERTIBLES.get(formal).indexOf(actual)) >= 0) {
+            cmp +=  dist;
+            continue;
+          }
+          dist = formal.getName().compareTo(actual.getName());
+          if (dist != 0) {
+            return cmp * (length - p);

Review Comment:
   nit: If there is a case `cmp * (length - p)` = 0 though the args mismatch?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 860988)
    Time Spent: 16h 10m  (was: 16h)

> A persistent property store 
> ----------------------------
>
>                 Key: HIVE-27186
>                 URL: https://issues.apache.org/jira/browse/HIVE-27186
>             Project: Hive
>          Issue Type: Improvement
>          Components: Metastore
>    Affects Versions: 4.0.0-alpha-2
>            Reporter: Henri Biestro
>            Assignee: Henri Biestro
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 16h 10m
>  Remaining Estimate: 0h
>
> WHAT
> A persistent property store usable as a support facility for any metadata 
> augmentation feature.
> WHY
> When adding new meta-data oriented features, we usually need to persist 
> information linking the feature data and the HiveMetaStore objects it applies 
> to. Any information related to a database, a table or the cluster - like 
> statistics for example or any operational data state or data (think rolling 
> backup) -  fall in this use-case.
> Typically, accommodating such a feature requires modifying the Metastore 
> database schema by adding or altering a table. It also usually implies 
> modifying the thrift APIs to expose such meta-data to consumers.
> The proposed feature wants to solve the persistence and query/transport for 
> these types of use-cases by exposing a 'key/(meta)value' store exposed as a 
> property system.
> HOW
> A property-value model is the simple and generic exposed API.
> To provision for several usage scenarios, the model entry point is a 
> 'namespace' that qualifies the feature-component property manager. For 
> example, 'stats' could be the namespace for all properties related to the 
> 'statistics' feature.
> The namespace identifies a manager that handles property-groups persisted as 
> property-maps. For instance, all statistics pertaining to a given table would 
> be collocated in the same property-group. As such, all properties (say number 
> of 'unique_values' per columns) for a given HMS table 'relation0' would all 
> be stored and persisted in the same property-map instance.
> Property-maps may be decorated by an (optional) schema that may declare the 
> name and value-type of allowed properties (and their optional default value). 
> Each property is addressed by a name, a path uniquely identifying the 
> property in a given property map.
> The manager also handles transforming property-map names to the property-map 
> keys used to persist them in the DB.
> The API provides inserting/updating properties in bulk transactionally. It 
> also provides selection/projection to help reduce the volume of exchange 
> between client/server; selection can use (JEXL expression) predicates to 
> filter maps.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to