[
https://issues.apache.org/jira/browse/HIVE-27186?focusedWorklogId=860988&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-860988
]
ASF GitHub Bot logged work on HIVE-27186:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 08/May/23 10:35
Start Date: 08/May/23 10:35
Worklog Time Spent: 10m
Work Description: dengzhhu653 commented on code in PR #4194:
URL: https://github.com/apache/hive/pull/4194#discussion_r1187292000
##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/properties/SerializationProxy.java:
##########
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Executable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static
org.apache.hadoop.hive.metastore.properties.Serializer.SERIALIZER;
+
+/**
+ * The serialization proxy template.
+ * <p>
+ * This allows a class that defines final members to be made serializable in
an easy way.
+ * The class <em>must</em> implement:
+ * <ul>
+ * <li>a constructor that takes a DataInput (or derived class) as
parameter</li>
+ * <li>a write method that takes a DataOutput (or derived class) as
parameter</li>
+ * </ul>
+ * <p>
+ * One should consider the constructor as being potentially fed with an
invalid stream so
+ * all usual checks of a public constructor should apply.
+ * </p>
+ * Standard usage is to add the Serializable interface implementation through
the following 2 methods:
+ * <code>
+ * private Object writeReplace() throws ObjectStreamException {
+ * return new SerializationProxy<TheClass>(this);
+ * }
+ * private void readObject(ObjectInputStream in)throws
IOException,ClassNotFoundException{
+ * throw new InvalidObjectException("proxy required");
+ * }
+ * </code>
+ * @param <T> the serializable object type
+ */
+public class SerializationProxy<T extends Serializable> implements
Externalizable {
+ /** Serial version. */
+ private static final long serialVersionUID = 202212281757L;
+ /** The logger. */
+ public static final Logger LOGGER =
LoggerFactory.getLogger(SerializationProxy.class);
+ /** The map of class names to types. */
+ private static final ConcurrentMap<String, Type<?>> TYPES = new
ConcurrentHashMap<>();
+ /** The list of registered pre-defined classes. */
+ private static final List<Type<?>> REGISTERED = new ArrayList<>();
+ /** A thread local context used for arguments passing during
serialization/de-serialization. */
+ private static final ThreadLocal<Object[]> EXTRA_ARGUMENTS = new
ThreadLocal<>();
+
+ /** The type of instance being read or written. */
+ private transient Type<T> type = null;
+ /** The instance being read or written. */
+ private transient T proxied = null;
+
+ /**
+ * Wraps any error that may occur whilst using reflective calls.
+ */
+ public static class ProxyException extends RuntimeException {
+ public ProxyException(Throwable cause) {
+ super(cause);
+ }
+
+ public ProxyException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Convert an exception to a VDBRuntimeException.
+ * @param cause the exception to convert
+ * @return the wrapping CubeException
+ */
+ public static ProxyException convert(Throwable cause) {
+ if (cause instanceof ProxyException) {
+ return (ProxyException) cause;
+ } else {
+ return new ProxyException(cause);
+ }
+ }
+ }
+
+ /**
+ * Constructor called from proxify.writeReplace().
+ * @param proxify the instance to proxy
+ */
+ @SuppressWarnings("unchecked")
+ public SerializationProxy(T proxify) {
+ Class<T> clazz = (Class<T>) proxify.getClass();
+ type = (Type<T>) TYPES.computeIfAbsent(clazz.getName(), this::createType);
+ proxied = proxify;
+ }
+
+ /**
+ * Default constructor.
+ */
+ public SerializationProxy() {
+ // do nothing
+ }
+
+ /**
+ * Sets the extra-arguments as a thread local context.
+ * <p>Used to pass extra arguments o constructors/write methods.</p>
+ * @param o the arguments
+ */
+ public static void setExtraArguments(Object[] o) {
+ if (null == o) {
+ EXTRA_ARGUMENTS.remove();
+ } else {
+ EXTRA_ARGUMENTS.set(o);
+ }
+ }
+
+ /**
+ * Gets the extra-arguments to ctor/write executable stored in a thread
local context.
+ * @return the arguments
+ */
+ public static Object[] getExtraArguments() {
+ return EXTRA_ARGUMENTS.get();
+ }
+
+ /**
+ * Swaps the thread local context.
+ * <p>This may be used to stack up contexts during cascading calls.</p>
+ * @param newArgs the new arguments
+ * @return the down-stack caller arguments
+ */
+ public static Object[] swapExtraArguments(Object[] newArgs) {
+ Object[] previous = EXTRA_ARGUMENTS.get();
+ setExtraArguments(newArgs);
+ return previous;
+ }
+
+ /**
+ * Unloads the proxy.
+ */
+ public static void unload() {
+ EXTRA_ARGUMENTS.remove();
+ TYPES.clear();
+ }
+
+ /**
+ * Registers a pre-defined class (known to be used throughout the whole
application).
+ * @param <T> the type
+ * @param slot the slot number
+ * @param clazz the class
+ */
+ public static <T extends Serializable> void registerType(final int slot,
Class<T> clazz) {
+ synchronized (REGISTERED) {
+ Type<T> ntype = new Type<>(clazz);
+ ntype.slot = slot;
+ if (slot >= 255) {
+ throw new IllegalArgumentException(ntype + "@" + slot + ": can not
register more than 254 types");
+ }
+ List<Type<?>> types = REGISTERED;
+ while (types.size() <= slot) {
+ types.add(null);
+ }
+ if (types.get(slot) != null) {
+ throw new IllegalArgumentException(ntype + "@" + slot + ": slot
already used by " + types.get(slot));
+ }
+ types.set(slot, ntype);
+ TYPES.put(clazz.getName(), ntype);
+ }
+ }
+
+ /**
+ * Called by serialization after readExternal.
+ * @return the proxied instance
+ * @throws IOException for signature compliance
+ */
+ public Object readResolve() throws IOException {
+ return proxied;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException {
+ long serial = in.readLong();
+ if (serial != serialVersionUID) {
+ throw new ProxyException("invalid serial version, got " + serial +",
expected " + serialVersionUID);
+ }
+ type = readType(in);
+ proxied = type.proxyNew(in);
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(serialVersionUID);
+ writeType(type, out);
+ type.proxyWrite(proxied, out);
+ }
+
+ /**
+ * Converts a serializable object to an array of bytes.
+ * @param serializable the object to serialize
+ * @param args the proxy arguments
+ * @return the array of bytes
+ * @throws ProxyException on any underlying error
+ */
+ public static byte[] toBytes(Serializable serializable, Object... args) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(512);
+ final Object[] stack = SerializationProxy.swapExtraArguments(args);
+ try (ObjectOutput oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(serializable);
+ oos.flush();
+ return bos.toByteArray();
+ } catch (IOException xany) {
+ throw ProxyException.convert(xany);
+ } finally {
+ SerializationProxy.swapExtraArguments(stack);
+ }
+ }
+
+ /**
+ * Materialize a serializable object from an array of bytes.
+ * @param bytes the bytes
+ * @param args the proxy arguments
+ * @return the object
+ * @throws ProxyException on any underlying error
+ */
+ public static <T extends Serializable> T fromBytes(byte[] bytes, Object...
args) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ final Object[] stack = SerializationProxy.swapExtraArguments(args);
+ try (ObjectInput ois = new ObjectInputStream(bis)) {
+ return (T) ois.readObject();
+ } catch (IOException | ClassNotFoundException | ClassCastException xany) {
+ throw ProxyException.convert(xany);
+ } finally {
+ SerializationProxy.swapExtraArguments(stack);
+ }
+ }
+
+ /**
+ * Saves an object to persistent storage.
+ * @param file the file to write to
+ * @param persist the object to serialize
+ * @param args the proxy constructor arguments
+ * @return true if successful, false if file is null
+ * @throws ProxyException in case of low level error
+ */
+ public static boolean write(File file, Serializable persist, Object... args)
{
+ return SERIALIZER.write(file, persist, args);
+ }
+
+ /**
+ * Saves an object to persistent storage.
+ * @param out the stream to write to
+ * @param persist the object to serialize
+ * @param args the proxy write method arguments
+ * @return true if successful, false if file is null
+ * @throws ProxyException in case of low level error
+ */
+ public static boolean write(OutputStream out, Serializable persist,
Object... args) {
+ return SERIALIZER.write(out, persist, args);
+ }
+
+ /**
+ * Loads an object from the persistent storage.
+ * @param file the file to read from
+ * @param args the proxy arguments
+ * @return the object or null if file is null
+ * @throws ProxyException in case of low level error
+ */
+ public static Serializable read(File file, Object... args) {
+ return SERIALIZER.read(file, args);
+ }
+
+ /**
+ * Loads an object from the persistent storage.
+ * @param in the stream to read from
+ * @param args the proxy arguments
+ * @return the object or null if file is null
+ * @throws ProxyException in case of low level error
+ */
+ public static <T extends Serializable> T read(InputStream in, Object...
args) {
+ return SERIALIZER.read(in, args);
+ }
+
+ /**
+ * Creates a Type using a class name.
+ * @param cname the class name
+ * @return a type instance
+ * @throws ProxyException on any underlying error
+ */
+ protected Type<T> createType(String cname) {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<T> clazz = (Class<T>) Class.forName(cname);
+ return new Type<>(clazz);
+ } catch (ClassNotFoundException xnotfound) {
+ throw ProxyException.convert(xnotfound);
+ }
+ }
+
+ /**
+ * When writing out this instance, write down the canonical class name it
proxifies.
+ * @param out the output
+ * @throws IOException if things go wrong
+ */
+ protected void writeType(Type<?> type, DataOutput out) throws IOException {
+ int slot = type.getSlot();
+ out.write(slot);
+ if (slot == 255) {
+ out.writeUTF(type.getTargetName());
+ }
+ }
+
+ /**
+ * When reading an instance, fetch the type through the canonical class name
that was persisted.
+ * @param in the input
+ * @throws IOException on read error
+ * @throws ProxyException if class was expected to be registered but can not
be found
+ */
+ @SuppressWarnings("unchecked")
+ protected Type<T> readType(DataInput in) throws IOException {
+ final Type<T> type;
+ String className = "?";
+ int slot = (int) in.readByte() & 0xff;
+ if (slot == 255) {
+ className = in.readUTF();
+ type = (Type<T>) TYPES.computeIfAbsent(className, this::createType);
+ } else if (slot < REGISTERED.size()) {
+ type = (Type<T>) REGISTERED.get(slot);
+ } else {
+ type = null;
+ }
+ if (type == null) {
+ throw new ProxyException("can not resolve class @ " + slot +", " +
className);
+ }
+ return type;
+ }
+
+ /**
+ * Encapsulates the mandatory constructor and write methods for a given
proxified class.
+ * @param <T> the proxified class
+ */
+ protected static class Type<T extends Serializable> {
+ private final Constructor<T>[] ctors;
+ private final Method[] writes;
+ private transient int slot = 255;
+
+ /**
+ * Creates a new instance of type.
+ * @param clazz the proxified class
+ */
+ public Type(Class<T> clazz) {
+ ctors = typeConstructors(clazz);
+ writes = typeWrites(clazz);
+ }
+
+ /**
+ * The slot number if the class is registered.
+ * @return the slot number, 255 means not-registered
+ */
+ public int getSlot() {
+ return slot;
+ }
+
+ /**
+ * @return the target class
+ */
+ public String getTargetName() {
+ // there is always at least one ctor
+ return ctors[0].getDeclaringClass().getName();
+ }
+
+ /**
+ * Compare parameter signatures of executables.
+ * @param lhs left-hand side
+ * @param rhs right-hand side
+ * @return 0 if equal, +/- 1 if left </> than right
+ */
+ private static int compareSignatures(Executable lhs, Executable rhs) {
+ return compareSignatures(lhs.getParameterTypes(),
rhs.getParameterTypes());
+ }
+
+ /**
+ * Compare executables parameter signatures.
+ * @param lhs left-hand side executable
+ * @param rhs right-hand side executable
+ * @return 0 if equal, +/- 1 if left </> than right
+ */
+ private static int compareSignatures(Class<?>[] lhs, Class<?>[] rhs) {
+ if (lhs.length < rhs.length) {
+ return -1;
+ }
+ if (lhs.length > rhs.length) {
+ return 1;
+ }
+ int cmp = 0;
+ // lhs.length == rhs.length
+ final int length = lhs.length;
+ for (int p = 0; p < length; ++p) {
+ Class<?> actual = lhs[p];
+ Class<?> formal = rhs[p];
+ if (formal != null && actual != null &&
!formal.isAssignableFrom(actual)) {
+ // if formal parameter is primitive and actual argument is compatible
+ int dist;
+ if (formal.isPrimitive() && (dist =
CONVERTIBLES.get(formal).indexOf(actual)) >= 0) {
+ cmp += dist;
+ continue;
+ }
+ dist = formal.getName().compareTo(actual.getName());
+ if (dist != 0) {
+ return cmp * (length - p);
Review Comment:
nit: If there is a case `cmp * (length - p)` = 0 though the args mismatch?
Issue Time Tracking
-------------------
Worklog Id: (was: 860988)
Time Spent: 16h 10m (was: 16h)
> A persistent property store
> ----------------------------
>
> Key: HIVE-27186
> URL: https://issues.apache.org/jira/browse/HIVE-27186
> Project: Hive
> Issue Type: Improvement
> Components: Metastore
> Affects Versions: 4.0.0-alpha-2
> Reporter: Henri Biestro
> Assignee: Henri Biestro
> Priority: Major
> Labels: pull-request-available
> Time Spent: 16h 10m
> Remaining Estimate: 0h
>
> WHAT
> A persistent property store usable as a support facility for any metadata
> augmentation feature.
> WHY
> When adding new meta-data oriented features, we usually need to persist
> information linking the feature data and the HiveMetaStore objects it applies
> to. Any information related to a database, a table or the cluster - like
> statistics for example or any operational data state or data (think rolling
> backup) - fall in this use-case.
> Typically, accommodating such a feature requires modifying the Metastore
> database schema by adding or altering a table. It also usually implies
> modifying the thrift APIs to expose such meta-data to consumers.
> The proposed feature wants to solve the persistence and query/transport for
> these types of use-cases by exposing a 'key/(meta)value' store exposed as a
> property system.
> HOW
> A property-value model is the simple and generic exposed API.
> To provision for several usage scenarios, the model entry point is a
> 'namespace' that qualifies the feature-component property manager. For
> example, 'stats' could be the namespace for all properties related to the
> 'statistics' feature.
> The namespace identifies a manager that handles property-groups persisted as
> property-maps. For instance, all statistics pertaining to a given table would
> be collocated in the same property-group. As such, all properties (say number
> of 'unique_values' per columns) for a given HMS table 'relation0' would all
> be stored and persisted in the same property-map instance.
> Property-maps may be decorated by an (optional) schema that may declare the
> name and value-type of allowed properties (and their optional default value).
> Each property is addressed by a name, a path uniquely identifying the
> property in a given property map.
> The manager also handles transforming property-map names to the property-map
> keys used to persist them in the DB.
> The API provides inserting/updating properties in bulk transactionally. It
> also provides selection/projection to help reduce the volume of exchange
> between client/server; selection can use (JEXL expression) predicates to
> filter maps.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)