Sxnan commented on code in PR #26655: URL: https://github.com/apache/flink/pull/26655#discussion_r2285263543
########## flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariantInternalBuilder.java: ########## @@ -0,0 +1,655 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types.variant; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.exc.InputCoercionException; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; + +import static org.apache.flink.types.variant.BinaryVariantUtil.ARRAY; +import static org.apache.flink.types.variant.BinaryVariantUtil.BASIC_TYPE_MASK; +import static org.apache.flink.types.variant.BinaryVariantUtil.BINARY; +import static org.apache.flink.types.variant.BinaryVariantUtil.DATE; +import static org.apache.flink.types.variant.BinaryVariantUtil.DECIMAL16; +import static org.apache.flink.types.variant.BinaryVariantUtil.DECIMAL4; +import static org.apache.flink.types.variant.BinaryVariantUtil.DECIMAL8; +import static org.apache.flink.types.variant.BinaryVariantUtil.DOUBLE; +import static org.apache.flink.types.variant.BinaryVariantUtil.FALSE; +import static org.apache.flink.types.variant.BinaryVariantUtil.FLOAT; +import static org.apache.flink.types.variant.BinaryVariantUtil.INT1; +import static org.apache.flink.types.variant.BinaryVariantUtil.INT2; +import static org.apache.flink.types.variant.BinaryVariantUtil.INT4; +import static org.apache.flink.types.variant.BinaryVariantUtil.INT8; +import static org.apache.flink.types.variant.BinaryVariantUtil.LONG_STR; +import static org.apache.flink.types.variant.BinaryVariantUtil.MAX_DECIMAL16_PRECISION; +import static org.apache.flink.types.variant.BinaryVariantUtil.MAX_DECIMAL4_PRECISION; +import static org.apache.flink.types.variant.BinaryVariantUtil.MAX_DECIMAL8_PRECISION; +import static org.apache.flink.types.variant.BinaryVariantUtil.MAX_SHORT_STR_SIZE; +import static org.apache.flink.types.variant.BinaryVariantUtil.NULL; +import static org.apache.flink.types.variant.BinaryVariantUtil.OBJECT; +import static org.apache.flink.types.variant.BinaryVariantUtil.SIZE_LIMIT; +import static org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP; +import static org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP_LTZ; +import static org.apache.flink.types.variant.BinaryVariantUtil.TRUE; +import static org.apache.flink.types.variant.BinaryVariantUtil.U16_MAX; +import static org.apache.flink.types.variant.BinaryVariantUtil.U24_MAX; +import static org.apache.flink.types.variant.BinaryVariantUtil.U24_SIZE; +import static org.apache.flink.types.variant.BinaryVariantUtil.U32_SIZE; +import static org.apache.flink.types.variant.BinaryVariantUtil.U8_MAX; +import static org.apache.flink.types.variant.BinaryVariantUtil.VERSION; +import static org.apache.flink.types.variant.BinaryVariantUtil.arrayHeader; +import static org.apache.flink.types.variant.BinaryVariantUtil.checkIndex; +import static org.apache.flink.types.variant.BinaryVariantUtil.getMetadataKey; +import static org.apache.flink.types.variant.BinaryVariantUtil.handleArray; +import static org.apache.flink.types.variant.BinaryVariantUtil.handleObject; +import static org.apache.flink.types.variant.BinaryVariantUtil.objectHeader; +import static org.apache.flink.types.variant.BinaryVariantUtil.primitiveHeader; +import static org.apache.flink.types.variant.BinaryVariantUtil.readUnsigned; +import static org.apache.flink.types.variant.BinaryVariantUtil.shortStrHeader; +import static org.apache.flink.types.variant.BinaryVariantUtil.valueSize; +import static org.apache.flink.types.variant.BinaryVariantUtil.writeLong; + +/* This file is based on source code from the Spark Project (http://spark.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** The internal builder for {@link BinaryVariant}. */ +@Internal +public class BinaryVariantInternalBuilder { + + public static final VariantTypeException VARIANT_SIZE_LIMIT_EXCEPTION = + new VariantTypeException("VARIANT_SIZE_LIMIT"); + public static final VariantTypeException VARIANT_DUPLICATE_KEY_EXCEPTION = + new VariantTypeException("VARIANT_DUPLICATE_KEY"); + + public BinaryVariantInternalBuilder(boolean allowDuplicateKeys) { + this.allowDuplicateKeys = allowDuplicateKeys; + } + + /** + * Parse a JSON string as a Variant value. + * + * @throws IOException if any JSON parsing error happens. + */ + public static BinaryVariant parseJson(String json, boolean allowDuplicateKeys) + throws IOException { + try (JsonParser parser = new JsonFactory().createParser(json)) { + parser.nextToken(); + return parseJson(parser, allowDuplicateKeys); + } + } + + /** + * Similar {@link #parseJson(String, boolean)}, but takes a JSON parser instead of string input. + */ + private static BinaryVariant parseJson(JsonParser parser, boolean allowDuplicateKeys) + throws IOException { + BinaryVariantInternalBuilder builder = new BinaryVariantInternalBuilder(allowDuplicateKeys); + builder.buildJson(parser); + return builder.build(); + } + + // Build the variant metadata from `dictionaryKeys` and return the variant result. + public BinaryVariant build() { + int numKeys = dictionaryKeys.size(); + // Use long to avoid overflow in accumulating lengths. + long dictionaryStringSize = 0; + for (byte[] key : dictionaryKeys) { + dictionaryStringSize += key.length; + } + // Determine the number of bytes required per offset entry. + // The largest offset is the one-past-the-end value, which is total string size. It's very + // unlikely that the number of keys could be larger, but incorporate that into the + // calculation + // in case of pathological data. + long maxSize = Math.max(dictionaryStringSize, numKeys); + if (maxSize > SIZE_LIMIT) { + throw VARIANT_SIZE_LIMIT_EXCEPTION; + } + int offsetSize = getIntegerSize((int) maxSize); + + int offsetStart = 1 + offsetSize; + int stringStart = offsetStart + (numKeys + 1) * offsetSize; + long metadataSize = stringStart + dictionaryStringSize; + + if (metadataSize > SIZE_LIMIT) { + throw VARIANT_SIZE_LIMIT_EXCEPTION; + } + byte[] metadata = new byte[(int) metadataSize]; + int headerByte = VERSION | ((offsetSize - 1) << 6); + writeLong(metadata, 0, headerByte, 1); + writeLong(metadata, 1, numKeys, offsetSize); + int currentOffset = 0; + for (int i = 0; i < numKeys; ++i) { + writeLong(metadata, offsetStart + i * offsetSize, currentOffset, offsetSize); + byte[] key = dictionaryKeys.get(i); + System.arraycopy(key, 0, metadata, stringStart + currentOffset, key.length); + currentOffset += key.length; + } + writeLong(metadata, offsetStart + numKeys * offsetSize, currentOffset, offsetSize); + return new BinaryVariant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); + } + + public void appendString(String str) { + byte[] text = str.getBytes(StandardCharsets.UTF_8); + boolean longStr = text.length > MAX_SHORT_STR_SIZE; + checkCapacity((longStr ? 1 + U32_SIZE : 1) + text.length); + if (longStr) { + writeBuffer[writePos++] = primitiveHeader(LONG_STR); + writeLong(writeBuffer, writePos, text.length, U32_SIZE); + writePos += U32_SIZE; + } else { + writeBuffer[writePos++] = shortStrHeader(text.length); + } + System.arraycopy(text, 0, writeBuffer, writePos, text.length); + writePos += text.length; + } + + public void appendNull() { + checkCapacity(1); + writeBuffer[writePos++] = primitiveHeader(NULL); + } + + public void appendBoolean(boolean b) { + checkCapacity(1); + writeBuffer[writePos++] = primitiveHeader(b ? TRUE : FALSE); + } + + public void appendByte(byte b) { + checkCapacity(1 + 1); + writeBuffer[writePos++] = primitiveHeader(INT1); + writeLong(writeBuffer, writePos, b, 1); + writePos += 1; + } + + public void appendShort(short s) { + checkCapacity(1 + 2); + writeBuffer[writePos++] = primitiveHeader(INT2); + writeLong(writeBuffer, writePos, s, 2); + writePos += 2; + } + + public void appendInt(int i) { + checkCapacity(1 + 4); + writeBuffer[writePos++] = primitiveHeader(INT4); + writeLong(writeBuffer, writePos, i, 4); + writePos += 4; + } + + public void appendLong(long l) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = primitiveHeader(INT8); + writeLong(writeBuffer, writePos, l, 8); + writePos += 8; + } + + public void appendNumeric(long l) { + if (l == (byte) l) { + appendByte((byte) l); + } else if (l == (short) l) { + appendShort((short) l); + } else if (l == (int) l) { + appendInt((int) l); + } else { + appendLong(l); + } + } + + public void appendDouble(double d) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = primitiveHeader(DOUBLE); + writeLong(writeBuffer, writePos, Double.doubleToLongBits(d), 8); + writePos += 8; + } + + // Append a decimal value to the variant builder. The caller should guarantee that its precision + // and scale fit into `MAX_DECIMAL16_PRECISION`. + public void appendDecimal(BigDecimal d) { + checkCapacity(2 + 16); + BigInteger unscaled = d.unscaledValue(); + if (d.scale() <= MAX_DECIMAL4_PRECISION && d.precision() <= MAX_DECIMAL4_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL4); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.intValueExact(), 4); + writePos += 4; + } else if (d.scale() <= MAX_DECIMAL8_PRECISION && d.precision() <= MAX_DECIMAL8_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL8); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.longValueExact(), 8); + writePos += 8; + } else { + assert d.scale() <= MAX_DECIMAL16_PRECISION && d.precision() <= MAX_DECIMAL16_PRECISION; + writeBuffer[writePos++] = primitiveHeader(DECIMAL16); + writeBuffer[writePos++] = (byte) d.scale(); + // `toByteArray` returns a big-endian representation. We need to copy it reversely and + // sign + // extend it to 16 bytes. + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < bytes.length; ++i) { + writeBuffer[writePos + i] = bytes[bytes.length - 1 - i]; + } + byte sign = (byte) (bytes[0] < 0 ? -1 : 0); + for (int i = bytes.length; i < 16; ++i) { + writeBuffer[writePos + i] = sign; + } + writePos += 16; + } + } + + public void appendDate(int daysSinceEpoch) { + checkCapacity(1 + 4); + writeBuffer[writePos++] = primitiveHeader(DATE); + writeLong(writeBuffer, writePos, daysSinceEpoch, 4); + writePos += 4; + } + + public void appendTimestampLtz(long microsSinceEpoch) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = primitiveHeader(TIMESTAMP_LTZ); + writeLong(writeBuffer, writePos, microsSinceEpoch, 8); + writePos += 8; + } + + public void appendTimestamp(long microsSinceEpoch) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = primitiveHeader(TIMESTAMP); + writeLong(writeBuffer, writePos, microsSinceEpoch, 8); + writePos += 8; + } + + public void appendFloat(float f) { + checkCapacity(1 + 4); + writeBuffer[writePos++] = primitiveHeader(FLOAT); + writeLong(writeBuffer, writePos, Float.floatToIntBits(f), 8); Review Comment: Good catch! It appears that it is also fixed in Spark recently, see [JIRA](https://issues.apache.org/jira/browse/SPARK-52833). @cshuo Feel free to pick it up if you want to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org