Sxnan commented on code in PR #26655:
URL: https://github.com/apache/flink/pull/26655#discussion_r2285263543


##########
flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariantInternalBuilder.java:
##########
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.exc.InputCoercionException;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+
+import static org.apache.flink.types.variant.BinaryVariantUtil.ARRAY;
+import static org.apache.flink.types.variant.BinaryVariantUtil.BASIC_TYPE_MASK;
+import static org.apache.flink.types.variant.BinaryVariantUtil.BINARY;
+import static org.apache.flink.types.variant.BinaryVariantUtil.DATE;
+import static org.apache.flink.types.variant.BinaryVariantUtil.DECIMAL16;
+import static org.apache.flink.types.variant.BinaryVariantUtil.DECIMAL4;
+import static org.apache.flink.types.variant.BinaryVariantUtil.DECIMAL8;
+import static org.apache.flink.types.variant.BinaryVariantUtil.DOUBLE;
+import static org.apache.flink.types.variant.BinaryVariantUtil.FALSE;
+import static org.apache.flink.types.variant.BinaryVariantUtil.FLOAT;
+import static org.apache.flink.types.variant.BinaryVariantUtil.INT1;
+import static org.apache.flink.types.variant.BinaryVariantUtil.INT2;
+import static org.apache.flink.types.variant.BinaryVariantUtil.INT4;
+import static org.apache.flink.types.variant.BinaryVariantUtil.INT8;
+import static org.apache.flink.types.variant.BinaryVariantUtil.LONG_STR;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.MAX_DECIMAL16_PRECISION;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.MAX_DECIMAL4_PRECISION;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.MAX_DECIMAL8_PRECISION;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.MAX_SHORT_STR_SIZE;
+import static org.apache.flink.types.variant.BinaryVariantUtil.NULL;
+import static org.apache.flink.types.variant.BinaryVariantUtil.OBJECT;
+import static org.apache.flink.types.variant.BinaryVariantUtil.SIZE_LIMIT;
+import static org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP;
+import static org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP_LTZ;
+import static org.apache.flink.types.variant.BinaryVariantUtil.TRUE;
+import static org.apache.flink.types.variant.BinaryVariantUtil.U16_MAX;
+import static org.apache.flink.types.variant.BinaryVariantUtil.U24_MAX;
+import static org.apache.flink.types.variant.BinaryVariantUtil.U24_SIZE;
+import static org.apache.flink.types.variant.BinaryVariantUtil.U32_SIZE;
+import static org.apache.flink.types.variant.BinaryVariantUtil.U8_MAX;
+import static org.apache.flink.types.variant.BinaryVariantUtil.VERSION;
+import static org.apache.flink.types.variant.BinaryVariantUtil.arrayHeader;
+import static org.apache.flink.types.variant.BinaryVariantUtil.checkIndex;
+import static org.apache.flink.types.variant.BinaryVariantUtil.getMetadataKey;
+import static org.apache.flink.types.variant.BinaryVariantUtil.handleArray;
+import static org.apache.flink.types.variant.BinaryVariantUtil.handleObject;
+import static org.apache.flink.types.variant.BinaryVariantUtil.objectHeader;
+import static org.apache.flink.types.variant.BinaryVariantUtil.primitiveHeader;
+import static org.apache.flink.types.variant.BinaryVariantUtil.readUnsigned;
+import static org.apache.flink.types.variant.BinaryVariantUtil.shortStrHeader;
+import static org.apache.flink.types.variant.BinaryVariantUtil.valueSize;
+import static org.apache.flink.types.variant.BinaryVariantUtil.writeLong;
+
+/* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** The internal builder for {@link BinaryVariant}. */
+@Internal
+public class BinaryVariantInternalBuilder {
+
+    public static final VariantTypeException VARIANT_SIZE_LIMIT_EXCEPTION =
+            new VariantTypeException("VARIANT_SIZE_LIMIT");
+    public static final VariantTypeException VARIANT_DUPLICATE_KEY_EXCEPTION =
+            new VariantTypeException("VARIANT_DUPLICATE_KEY");
+
+    public BinaryVariantInternalBuilder(boolean allowDuplicateKeys) {
+        this.allowDuplicateKeys = allowDuplicateKeys;
+    }
+
+    /**
+     * Parse a JSON string as a Variant value.
+     *
+     * @throws IOException if any JSON parsing error happens.
+     */
+    public static BinaryVariant parseJson(String json, boolean 
allowDuplicateKeys)
+            throws IOException {
+        try (JsonParser parser = new JsonFactory().createParser(json)) {
+            parser.nextToken();
+            return parseJson(parser, allowDuplicateKeys);
+        }
+    }
+
+    /**
+     * Similar {@link #parseJson(String, boolean)}, but takes a JSON parser 
instead of string input.
+     */
+    private static BinaryVariant parseJson(JsonParser parser, boolean 
allowDuplicateKeys)
+            throws IOException {
+        BinaryVariantInternalBuilder builder = new 
BinaryVariantInternalBuilder(allowDuplicateKeys);
+        builder.buildJson(parser);
+        return builder.build();
+    }
+
+    // Build the variant metadata from `dictionaryKeys` and return the variant 
result.
+    public BinaryVariant build() {
+        int numKeys = dictionaryKeys.size();
+        // Use long to avoid overflow in accumulating lengths.
+        long dictionaryStringSize = 0;
+        for (byte[] key : dictionaryKeys) {
+            dictionaryStringSize += key.length;
+        }
+        // Determine the number of bytes required per offset entry.
+        // The largest offset is the one-past-the-end value, which is total 
string size. It's very
+        // unlikely that the number of keys could be larger, but incorporate 
that into the
+        // calculation
+        // in case of pathological data.
+        long maxSize = Math.max(dictionaryStringSize, numKeys);
+        if (maxSize > SIZE_LIMIT) {
+            throw VARIANT_SIZE_LIMIT_EXCEPTION;
+        }
+        int offsetSize = getIntegerSize((int) maxSize);
+
+        int offsetStart = 1 + offsetSize;
+        int stringStart = offsetStart + (numKeys + 1) * offsetSize;
+        long metadataSize = stringStart + dictionaryStringSize;
+
+        if (metadataSize > SIZE_LIMIT) {
+            throw VARIANT_SIZE_LIMIT_EXCEPTION;
+        }
+        byte[] metadata = new byte[(int) metadataSize];
+        int headerByte = VERSION | ((offsetSize - 1) << 6);
+        writeLong(metadata, 0, headerByte, 1);
+        writeLong(metadata, 1, numKeys, offsetSize);
+        int currentOffset = 0;
+        for (int i = 0; i < numKeys; ++i) {
+            writeLong(metadata, offsetStart + i * offsetSize, currentOffset, 
offsetSize);
+            byte[] key = dictionaryKeys.get(i);
+            System.arraycopy(key, 0, metadata, stringStart + currentOffset, 
key.length);
+            currentOffset += key.length;
+        }
+        writeLong(metadata, offsetStart + numKeys * offsetSize, currentOffset, 
offsetSize);
+        return new BinaryVariant(Arrays.copyOfRange(writeBuffer, 0, writePos), 
metadata);
+    }
+
+    public void appendString(String str) {
+        byte[] text = str.getBytes(StandardCharsets.UTF_8);
+        boolean longStr = text.length > MAX_SHORT_STR_SIZE;
+        checkCapacity((longStr ? 1 + U32_SIZE : 1) + text.length);
+        if (longStr) {
+            writeBuffer[writePos++] = primitiveHeader(LONG_STR);
+            writeLong(writeBuffer, writePos, text.length, U32_SIZE);
+            writePos += U32_SIZE;
+        } else {
+            writeBuffer[writePos++] = shortStrHeader(text.length);
+        }
+        System.arraycopy(text, 0, writeBuffer, writePos, text.length);
+        writePos += text.length;
+    }
+
+    public void appendNull() {
+        checkCapacity(1);
+        writeBuffer[writePos++] = primitiveHeader(NULL);
+    }
+
+    public void appendBoolean(boolean b) {
+        checkCapacity(1);
+        writeBuffer[writePos++] = primitiveHeader(b ? TRUE : FALSE);
+    }
+
+    public void appendByte(byte b) {
+        checkCapacity(1 + 1);
+        writeBuffer[writePos++] = primitiveHeader(INT1);
+        writeLong(writeBuffer, writePos, b, 1);
+        writePos += 1;
+    }
+
+    public void appendShort(short s) {
+        checkCapacity(1 + 2);
+        writeBuffer[writePos++] = primitiveHeader(INT2);
+        writeLong(writeBuffer, writePos, s, 2);
+        writePos += 2;
+    }
+
+    public void appendInt(int i) {
+        checkCapacity(1 + 4);
+        writeBuffer[writePos++] = primitiveHeader(INT4);
+        writeLong(writeBuffer, writePos, i, 4);
+        writePos += 4;
+    }
+
+    public void appendLong(long l) {
+        checkCapacity(1 + 8);
+        writeBuffer[writePos++] = primitiveHeader(INT8);
+        writeLong(writeBuffer, writePos, l, 8);
+        writePos += 8;
+    }
+
+    public void appendNumeric(long l) {
+        if (l == (byte) l) {
+            appendByte((byte) l);
+        } else if (l == (short) l) {
+            appendShort((short) l);
+        } else if (l == (int) l) {
+            appendInt((int) l);
+        } else {
+            appendLong(l);
+        }
+    }
+
+    public void appendDouble(double d) {
+        checkCapacity(1 + 8);
+        writeBuffer[writePos++] = primitiveHeader(DOUBLE);
+        writeLong(writeBuffer, writePos, Double.doubleToLongBits(d), 8);
+        writePos += 8;
+    }
+
+    // Append a decimal value to the variant builder. The caller should 
guarantee that its precision
+    // and scale fit into `MAX_DECIMAL16_PRECISION`.
+    public void appendDecimal(BigDecimal d) {
+        checkCapacity(2 + 16);
+        BigInteger unscaled = d.unscaledValue();
+        if (d.scale() <= MAX_DECIMAL4_PRECISION && d.precision() <= 
MAX_DECIMAL4_PRECISION) {
+            writeBuffer[writePos++] = primitiveHeader(DECIMAL4);
+            writeBuffer[writePos++] = (byte) d.scale();
+            writeLong(writeBuffer, writePos, unscaled.intValueExact(), 4);
+            writePos += 4;
+        } else if (d.scale() <= MAX_DECIMAL8_PRECISION && d.precision() <= 
MAX_DECIMAL8_PRECISION) {
+            writeBuffer[writePos++] = primitiveHeader(DECIMAL8);
+            writeBuffer[writePos++] = (byte) d.scale();
+            writeLong(writeBuffer, writePos, unscaled.longValueExact(), 8);
+            writePos += 8;
+        } else {
+            assert d.scale() <= MAX_DECIMAL16_PRECISION && d.precision() <= 
MAX_DECIMAL16_PRECISION;
+            writeBuffer[writePos++] = primitiveHeader(DECIMAL16);
+            writeBuffer[writePos++] = (byte) d.scale();
+            // `toByteArray` returns a big-endian representation. We need to 
copy it reversely and
+            // sign
+            // extend it to 16 bytes.
+            byte[] bytes = unscaled.toByteArray();
+            for (int i = 0; i < bytes.length; ++i) {
+                writeBuffer[writePos + i] = bytes[bytes.length - 1 - i];
+            }
+            byte sign = (byte) (bytes[0] < 0 ? -1 : 0);
+            for (int i = bytes.length; i < 16; ++i) {
+                writeBuffer[writePos + i] = sign;
+            }
+            writePos += 16;
+        }
+    }
+
+    public void appendDate(int daysSinceEpoch) {
+        checkCapacity(1 + 4);
+        writeBuffer[writePos++] = primitiveHeader(DATE);
+        writeLong(writeBuffer, writePos, daysSinceEpoch, 4);
+        writePos += 4;
+    }
+
+    public void appendTimestampLtz(long microsSinceEpoch) {
+        checkCapacity(1 + 8);
+        writeBuffer[writePos++] = primitiveHeader(TIMESTAMP_LTZ);
+        writeLong(writeBuffer, writePos, microsSinceEpoch, 8);
+        writePos += 8;
+    }
+
+    public void appendTimestamp(long microsSinceEpoch) {
+        checkCapacity(1 + 8);
+        writeBuffer[writePos++] = primitiveHeader(TIMESTAMP);
+        writeLong(writeBuffer, writePos, microsSinceEpoch, 8);
+        writePos += 8;
+    }
+
+    public void appendFloat(float f) {
+        checkCapacity(1 + 4);
+        writeBuffer[writePos++] = primitiveHeader(FLOAT);
+        writeLong(writeBuffer, writePos, Float.floatToIntBits(f), 8);

Review Comment:
   Good catch! It appears that it is also fixed in Spark recently, see 
[JIRA](https://issues.apache.org/jira/browse/SPARK-52833). @cshuo Feel free to 
pick it up if you want to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to