twalthr commented on code in PR #26655:
URL: https://github.com/apache/flink/pull/26655#discussion_r2137336904


##########
flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java:
##########
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.CharArrayWriter;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Objects;
+
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.BINARY_SEARCH_THRESHOLD;
+import static org.apache.flink.types.variant.BinaryVariantUtil.SIZE_LIMIT;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP_FORMATTER;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP_LTZ_FORMATTER;
+import static org.apache.flink.types.variant.BinaryVariantUtil.VERSION;
+import static org.apache.flink.types.variant.BinaryVariantUtil.VERSION_MASK;
+import static org.apache.flink.types.variant.BinaryVariantUtil.checkIndex;
+import static org.apache.flink.types.variant.BinaryVariantUtil.getMetadataKey;
+import static org.apache.flink.types.variant.BinaryVariantUtil.handleArray;
+import static org.apache.flink.types.variant.BinaryVariantUtil.handleObject;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.malformedVariant;
+import static org.apache.flink.types.variant.BinaryVariantUtil.readUnsigned;
+import static org.apache.flink.types.variant.BinaryVariantUtil.unexpectedType;
+import static org.apache.flink.types.variant.BinaryVariantUtil.valueSize;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.variantConstructorSizeLimit;
+
+/**
+ * A data structure that represents a semi-structured value. It consists of 
two binary values: value
+ * and metadata. The value encodes types and values, but not field names. The 
metadata currently
+ * contains a version flag and a list of field names. We can extend/modify the 
detailed binary
+ * format given the version flag.
+ *
+ * @see <a 
href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md";>Variant
+ *     Binary Encoding</a> for the detail layout of the data structure.
+ */
+@Internal
+public final class BinaryVariant implements Variant {
+
+    private final byte[] value;
+    private final byte[] metadata;
+    // The variant value doesn't use the whole `value` binary, but starts from 
its `pos` index and
+    // spans a size of `valueSize(value, pos)`. This design avoids frequent 
copies of the value
+    // binary when reading a sub-variant in the array/object element.
+    private final int pos;
+
+    public BinaryVariant(byte[] value, byte[] metadata) {
+        this(value, metadata, 0);
+    }
+
+    private BinaryVariant(byte[] value, byte[] metadata, int pos) {
+        this.value = value;
+        this.metadata = metadata;
+        this.pos = pos;
+        // There is currently only one allowed version.
+        if (metadata.length < 1 || (metadata[0] & VERSION_MASK) != VERSION) {
+            throw malformedVariant();
+        }
+        // Don't attempt to use a Variant larger than 16 MiB. We'll never 
produce one, and it risks
+        // memory instability.
+        if (metadata.length > SIZE_LIMIT || value.length > SIZE_LIMIT) {
+            throw variantConstructorSizeLimit();
+        }
+    }
+
+    @Override
+    public boolean isPrimitive() {
+        return !isArray() && !isObject();
+    }
+
+    @Override
+    public boolean isArray() {
+        return getType() == Type.ARRAY;
+    }
+
+    @Override
+    public boolean isObject() {
+        return getType() == Type.OBJECT;
+    }
+
+    @Override
+    public boolean isNull() {
+        return getType() == Type.NULL;
+    }
+
+    @Override
+    public Type getType() {
+        return BinaryVariantUtil.getType(value, pos);
+    }
+
+    @Override
+    public boolean getBoolean() throws VariantTypeException {
+        checkType(Type.BOOLEAN, getType());
+        return BinaryVariantUtil.getBoolean(value, pos);
+    }
+
+    @Override
+    public byte getByte() throws VariantTypeException {
+        checkType(Type.TINYINT, getType());
+        return (byte) BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public short getShort() throws VariantTypeException {
+        checkType(Type.SMALLINT, getType());
+        return (short) BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public int getInt() throws VariantTypeException {
+        checkType(Type.INT, getType());
+        return (int) BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public long getLong() throws VariantTypeException {
+        checkType(Type.BIGINT, getType());
+        return BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public float getFloat() throws VariantTypeException {
+        checkType(Type.FLOAT, getType());
+        return BinaryVariantUtil.getFloat(value, pos);
+    }
+
+    @Override
+    public BigDecimal getDecimal() throws VariantTypeException {
+        checkType(Type.DECIMAL, getType());
+        return BinaryVariantUtil.getDecimal(value, pos);
+    }
+
+    @Override
+    public double getDouble() throws VariantTypeException {
+        checkType(Type.DOUBLE, getType());
+        return BinaryVariantUtil.getDouble(value, pos);
+    }
+
+    @Override
+    public String getString() throws VariantTypeException {
+        checkType(Type.STRING, getType());
+        return BinaryVariantUtil.getString(value, pos);
+    }
+
+    @Override
+    public LocalDate getDate() throws VariantTypeException {
+        checkType(Type.DATE, getType());
+        return LocalDate.ofEpochDay(BinaryVariantUtil.getLong(value, pos));
+    }
+
+    @Override
+    public LocalDateTime getTimestamp() throws VariantTypeException {
+        checkType(Type.TIMESTAMP, getType());
+        return microsToInstant(BinaryVariantUtil.getLong(value, pos))
+                .atZone(ZoneOffset.UTC)
+                .toLocalDateTime();
+    }
+
+    @Override
+    public Instant getInstant() throws VariantTypeException {
+        checkType(Type.TIMESTAMP_LTZ, getType());
+        return microsToInstant(BinaryVariantUtil.getLong(value, pos));
+    }
+
+    @Override
+    public byte[] getBytes() throws VariantTypeException {
+        checkType(Type.BINARY, getType());
+        return BinaryVariantUtil.getBinary(value, pos);
+    }
+
+    @Override
+    public Object get() throws VariantTypeException {
+        switch (getType()) {
+            case NULL:
+                return null;
+            case BOOLEAN:
+                return getBoolean();
+            case TINYINT:
+                return getByte();
+            case SMALLINT:
+                return getShort();
+            case INT:
+                return getInt();
+            case BIGINT:
+                return getLong();
+            case FLOAT:
+                return getFloat();
+            case DOUBLE:
+                return getDouble();
+            case DECIMAL:
+                return getDecimal();
+            case STRING:
+                return getString();
+            case DATE:
+                return getDate();
+            case TIMESTAMP:
+                return getTimestamp();
+            case TIMESTAMP_LTZ:
+                return getInstant();
+            case BINARY:
+                return getBytes();
+            default:
+                throw new VariantTypeException(
+                        String.format("Expecting a scalar variant but got %s", 
getType()));

Review Comment:
   ```suggestion
                           String.format("Expecting a primitive type but got 
%s", getType()));
   ```



##########
flink-core/src/main/java/org/apache/flink/types/variant/VariantBuilder.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/** Builder for variants. */
+@PublicEvolving
+public interface VariantBuilder {
+
+    /** Create a variant from a byte. */
+    Variant of(byte b);
+
+    /** Create a variant from a short. */
+    Variant of(short s);
+
+    /** Create a variant from a int. */
+    Variant of(int i);
+
+    /** Create a variant from a long. */
+    Variant of(long l);
+
+    /** Create a variant from a string. */
+    Variant of(String s);
+
+    /** Create a variant from a double. */
+    Variant of(Double d);

Review Comment:
   ```suggestion
       Variant of(double d);
   ```



##########
flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantInternalBuilderTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class BinaryVariantInternalBuilderTest {
+
+    @Test
+    void testParseScalarJson() throws IOException {
+        assertThat(BinaryVariantInternalBuilder.parseJson("1", 
false).getByte())
+                .isEqualTo((byte) 1);
+        short s = (short) (Byte.MAX_VALUE + 1L);
+        assertThat(BinaryVariantInternalBuilder.parseJson(String.valueOf(s), 
false).getShort())
+                .isEqualTo(s);
+        int i = (int) (Short.MAX_VALUE + 1L);
+        assertThat(BinaryVariantInternalBuilder.parseJson(String.valueOf(i), 
false).getInt())
+                .isEqualTo(i);
+        long l = Integer.MAX_VALUE + 1L;
+        assertThat(BinaryVariantInternalBuilder.parseJson(String.valueOf(l), 
false).getLong())
+                .isEqualTo(l);
+
+        BigDecimal bigDecimal = 
BigDecimal.valueOf(Long.MAX_VALUE).add(BigDecimal.ONE);
+        assertThat(
+                        
BinaryVariantInternalBuilder.parseJson(bigDecimal.toPlainString(), false)
+                                .getDecimal())
+                .isEqualTo(bigDecimal);
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("1.123", 
false).getDecimal())
+                .isEqualTo(BigDecimal.valueOf(1.123));
+        assertThat(
+                        BinaryVariantInternalBuilder.parseJson(
+                                        String.valueOf(Double.MAX_VALUE), 
false)
+                                .getDouble())
+                .isEqualTo(Double.MAX_VALUE);
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("\"hello\"", 
false).getString())
+                .isEqualTo("hello");
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("true", 
false).getBoolean()).isTrue();
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("false", 
false).getBoolean()).isFalse();
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("null", 
false).isNull()).isTrue();

Review Comment:
   Test parsing of time types here as well



##########
flink-core/src/main/java/org/apache/flink/types/variant/Variant.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/** Variant represent a semi-structured data. */
+@PublicEvolving
+public interface Variant extends Serializable {
+
+    /** Returns true if the variant is a primitive typed value, such as INT, 
DOUBLE, STRING, etc. */
+    boolean isPrimitive();
+
+    /** Returns true if this variant is an Array, false otherwise. */
+    boolean isArray();
+
+    /** Returns true if this variant is an Object, false otherwise. */
+    boolean isObject();
+
+    /** Check If this variant is null. */
+    boolean isNull();
+
+    /** Get the type of variant. */
+    Type getType();
+
+    /**
+     * Get the scalar value of variant as boolean, if the variant type is 
{@link Type#BOOLEAN}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#BOOLEAN}.
+     */
+    boolean getBoolean() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as byte, if the variant type is {@link 
Type#TINYINT}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#TINYINT}.
+     */
+    byte getByte() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as short, if the variant type is {@link 
Type#SMALLINT}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#SMALLINT}.
+     */
+    short getShort() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as int, if the variant type is {@link 
Type#INT}.
+     *
+     * @throws VariantTypeException if this variant is not a scalar value or 
is not {@link
+     *     Type#INT}.
+     */
+    int getInt() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as long, if the variant type is {@link 
Type#BIGINT}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#BIGINT}.
+     */
+    long getLong() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as float, if the variant type is {@link 
Type#FLOAT}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#FLOAT}.
+     */
+    float getFloat() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as BigDecimal, if the variant type is 
{@link Type#DECIMAL}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#DECIMAL}.
+     */
+    BigDecimal getDecimal() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as double, if the variant type is 
{@link Type#DOUBLE}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#DOUBLE}.
+     */
+    double getDouble() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as string, if the variant type is 
{@link Type#STRING}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#STRING}.
+     */
+    String getString() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as LocalDate, if the variant type is 
{@link Type#DATE}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#DATE}.
+     */
+    LocalDate getDate() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as LocalDateTime, if the variant type 
is {@link
+     * Type#TIMESTAMP}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#TIMESTAMP}.
+     */
+    LocalDateTime getTimestamp() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as Instant, if the variant type is 
{@link Type#TIMESTAMP}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#TIMESTAMP}.
+     */
+    Instant getInstant() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant as byte array, if the variant type is 
{@link Type#BINARY}.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value or 
is not {@link
+     *     Type#BINARY}.
+     */
+    byte[] getBytes() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value.
+     */
+    Object get() throws VariantTypeException;
+
+    /**
+     * Get the scalar value of variant.
+     *
+     * @throws VariantTypeException If this variant is not a scalar value.
+     */
+    <T> T getAs() throws VariantTypeException;
+
+    /**
+     * Access value of the specified element of an array variant. If index is 
out of range, null is
+     * returned.
+     *
+     * <p>NOTE: if the element value has been explicitly set as 
<code>null</code> (which is
+     * different from removal!), a variant that @{@link Variant#isNull()} 
returns true will be
+     * returned, not null.
+     *
+     * @throws VariantTypeException If this variant is not an array.
+     */
+    Variant getElement(int index) throws VariantTypeException;
+
+    /**
+     * Access value of the specified field of an object variant. If there is 
no field with the
+     * specified name, null is returned.
+     *
+     * <p>NOTE: if the property value has been explicitly set as 
<code>null</code>, a variant
+     * that @{@link Variant#isNull()} returns true will be returned, not null.
+     *
+     * @throws VariantTypeException If this variant is not an object.
+     */
+    Variant getField(String fieldName) throws VariantTypeException;
+
+    /** Parses the variant to json. */
+    String toJson();
+
+    /** The type of variant. */
+    @PublicEvolving
+    enum Type {
+        OBJECT,
+        ARRAY,
+        NULL,
+        BOOLEAN,
+        TINYINT,
+        SMALLINT,
+        INT,
+        BIGINT,
+        FLOAT,
+        DOUBLE,
+        DECIMAL,
+        STRING,
+        DATE,
+        TIMESTAMP,
+        TIMESTAMP_LTZ,
+        BINARY
+    }

Review Comment:
   Don't we want to offer static `of()` and builder methods directly in this 
interface? A regular user should not need to work with BinaryVariant. Similar 
how `BinaryStringData` is hidden from the user when `StringData.fromString` is 
used.



##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlCastFunction.java:
##########
@@ -161,12 +161,19 @@ public boolean checkOperandTypes(SqlCallBinding 
callBinding, boolean throwOnFail
 
     private boolean canCastFrom(RelDataType toType, RelDataType fromType) {
         SqlTypeName fromTypeName = fromType.getSqlTypeName();
+
+        // Cast to Variant is not support at the moment.
+        // TODO: Support cast to variant

Review Comment:
   Link to corresponding JIRA ticket



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala:
##########
@@ -514,6 +514,62 @@ class FunctionGenerator private (tableConfig: 
ReadableConfig) {
     Seq(VARCHAR),
     new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_SCALAR, 
argsNullable = true)))
 
+  addSqlFunctionMethod(

Review Comment:
   This is not necessary when using the new `BuiltInFunctionDefinition` stack.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/VariantUtils.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.variant.BinaryVariantInternalBuilder;
+import org.apache.flink.types.variant.Variant;
+
+import javax.annotation.Nullable;
+
+@Internal
+public class VariantUtils {
+
+    public static @Nullable Variant parseJson(@Nullable String jsonString) {
+        return parseJson(jsonString, false);
+    }
+
+    public static @Nullable Variant parseJson(
+            @Nullable String jsonString, boolean allowDuplicateKeys) {
+        if (jsonString == null) {
+            return null;
+        }
+
+        try {
+            return BinaryVariantInternalBuilder.parseJson(jsonString, 
allowDuplicateKeys);
+        } catch (Throwable e) {
+            throw new RuntimeException(

Review Comment:
   ```suggestion
               throw new TableRuntimeException(
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/VariantSerializerUpgradeTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConditions;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+import 
org.apache.flink.api.common.typeutils.base.VariantSerializer.VariantSerializerSnapshot;
+import org.apache.flink.test.util.MigrationTest;
+import org.apache.flink.types.variant.BinaryVariantBuilder;
+import org.apache.flink.types.variant.Variant;
+
+import org.assertj.core.api.Condition;
+import org.junit.jupiter.api.Disabled;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/** A {@link TypeSerializerUpgradeTestBase} for {@link 
VariantSerializerSnapshot}. */
+@Disabled("FLINK-XXXXX")

Review Comment:
   Not updated?



##########
flink-core/src/main/java/org/apache/flink/types/variant/VariantBuilder.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/** Builder for variants. */
+@PublicEvolving
+public interface VariantBuilder {
+
+    /** Create a variant from a byte. */
+    Variant of(byte b);
+
+    /** Create a variant from a short. */
+    Variant of(short s);
+
+    /** Create a variant from a int. */
+    Variant of(int i);
+
+    /** Create a variant from a long. */
+    Variant of(long l);
+
+    /** Create a variant from a string. */
+    Variant of(String s);
+
+    /** Create a variant from a double. */
+    Variant of(Double d);
+
+    /** Create a variant from a float. */
+    Variant of(Float f);

Review Comment:
   ```suggestion
       Variant of(float f);
   ```



##########
flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariantUtil.java:
##########
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.variant.Variant.Type;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.util.Arrays;
+import java.util.Locale;
+
+/* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * This class defines constants related to the variant format and provides 
functions for
+ * manipulating variant binaries.
+ *
+ * <p>A variant is made up of 2 binaries: value and metadata. A variant value 
consists of a one-byte
+ * header and a number of content bytes (can be zero). The header byte is 
divided into upper 6 bits
+ * (called "type info") and lower 2 bits (called "basic type"). The content 
format is explained in
+ * the below constants for all possible basic type and type info values.
+ *
+ * <p>The variant metadata includes a version id and a dictionary of distinct 
strings
+ * (case-sensitive). Its binary format is: - Version: 1-byte unsigned integer. 
The only acceptable
+ * value is 1 currently. - Dictionary size: 4-byte little-endian unsigned 
integer. The number of
+ * keys in the dictionary. - Offsets: (size + 1) * 4-byte little-endian 
unsigned integers.
+ * `offsets[i]` represents the starting position of string i, counting 
starting from the address of
+ * `offsets[0]`. Strings must be stored contiguously, so we don’t need to 
store the string size,
+ * instead, we compute it with `offset[i + 1] - offset[i]`. - UTF-8 string 
data.
+ */
+@Internal
+public class BinaryVariantUtil {
+    public static final int BASIC_TYPE_BITS = 2;
+    public static final int BASIC_TYPE_MASK = 0x3;
+    public static final int TYPE_INFO_MASK = 0x3F;
+    // The inclusive maximum value of the type info value. It is the size 
limit of `SHORT_STR`.
+    public static final int MAX_SHORT_STR_SIZE = 0x3F;
+
+    // Below is all possible basic type values.
+    // Primitive value. The type info value must be one of the values in the 
below section.
+    public static final int PRIMITIVE = 0;
+    // Short string value. The type info value is the string size, which must 
be in `[0,
+    // kMaxShortStrSize]`.
+    // The string content bytes directly follow the header byte.
+    public static final int SHORT_STR = 1;
+    // Object value. The content contains a size, a list of field ids, a list 
of field offsets, and
+    // the actual field data. The length of the id list is `size`, while the 
length of the offset
+    // list is `size + 1`, where the last offset represent the total size of 
the field data. The
+    // fields in an object must be sorted by the field name in alphabetical 
order. Duplicate field
+    // names in one object are not allowed.
+    // We use 5 bits in the type info to specify the integer type of the 
object header: it should
+    // be 0_b4_b3b2_b1b0 (MSB is 0), where:
+    // - b4 specifies the type of size. When it is 0/1, `size` is a 
little-endian 1/4-byte
+    // unsigned integer.
+    // - b3b2/b1b0 specifies the integer type of id and offset. When the 2 
bits are  0/1/2, the
+    // list contains 1/2/3-byte little-endian unsigned integers.
+    public static final int OBJECT = 2;
+    // Array value. The content contains a size, a list of field offsets, and 
the actual element
+    // data. It is similar to an object without the id list. The length of the 
offset list
+    // is `size + 1`, where the last offset represent the total size of the 
element data.
+    // Its type info should be: 000_b2_b1b0:
+    // - b2 specifies the type of size.
+    // - b1b0 specifies the integer type of offset.
+    public static final int ARRAY = 3;
+
+    // Below is all possible type info values for `PRIMITIVE`.
+    // JSON Null value. Empty content.
+    public static final int NULL = 0;
+    // True value. Empty content.
+    public static final int TRUE = 1;
+    // False value. Empty content.
+    public static final int FALSE = 2;
+    // 1-byte little-endian signed integer.
+    public static final int INT1 = 3;
+    // 2-byte little-endian signed integer.
+    public static final int INT2 = 4;
+    // 4-byte little-endian signed integer.
+    public static final int INT4 = 5;
+    // 4-byte little-endian signed integer.
+    public static final int INT8 = 6;
+    // 8-byte IEEE double.
+    public static final int DOUBLE = 7;
+    // 4-byte decimal. Content is 1-byte scale + 4-byte little-endian signed 
integer.
+    public static final int DECIMAL4 = 8;
+    // 8-byte decimal. Content is 1-byte scale + 8-byte little-endian signed 
integer.
+    public static final int DECIMAL8 = 9;
+    // 16-byte decimal. Content is 1-byte scale + 16-byte little-endian signed 
integer.
+    public static final int DECIMAL16 = 10;
+    // Date value. Content is 4-byte little-endian signed integer that 
represents the number of days
+    // from the Unix epoch.
+    public static final int DATE = 11;
+    // Timestamp value. Content is 8-byte little-endian signed integer that 
represents the number of
+    // microseconds elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. It 
is displayed to users
+    // in
+    // their local time zones and may be displayed differently depending on 
the execution
+    // environment.
+    public static final int TIMESTAMP = 12;

Review Comment:
   The naming of timestamp types is not consistent in this PR. Call this 
`TIMESTAMP_LTZ` then? And the one in `Types` `Types.TIMESTAMP_NTZ`?



##########
flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java:
##########
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.CharArrayWriter;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Objects;
+
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.BINARY_SEARCH_THRESHOLD;
+import static org.apache.flink.types.variant.BinaryVariantUtil.SIZE_LIMIT;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP_FORMATTER;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP_LTZ_FORMATTER;
+import static org.apache.flink.types.variant.BinaryVariantUtil.VERSION;
+import static org.apache.flink.types.variant.BinaryVariantUtil.VERSION_MASK;
+import static org.apache.flink.types.variant.BinaryVariantUtil.checkIndex;
+import static org.apache.flink.types.variant.BinaryVariantUtil.getMetadataKey;
+import static org.apache.flink.types.variant.BinaryVariantUtil.handleArray;
+import static org.apache.flink.types.variant.BinaryVariantUtil.handleObject;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.malformedVariant;
+import static org.apache.flink.types.variant.BinaryVariantUtil.readUnsigned;
+import static org.apache.flink.types.variant.BinaryVariantUtil.unexpectedType;
+import static org.apache.flink.types.variant.BinaryVariantUtil.valueSize;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.variantConstructorSizeLimit;
+
+/**
+ * A data structure that represents a semi-structured value. It consists of 
two binary values: value
+ * and metadata. The value encodes types and values, but not field names. The 
metadata currently
+ * contains a version flag and a list of field names. We can extend/modify the 
detailed binary
+ * format given the version flag.
+ *
+ * @see <a 
href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md";>Variant
+ *     Binary Encoding</a> for the detail layout of the data structure.
+ */
+@Internal
+public final class BinaryVariant implements Variant {
+
+    private final byte[] value;
+    private final byte[] metadata;
+    // The variant value doesn't use the whole `value` binary, but starts from 
its `pos` index and
+    // spans a size of `valueSize(value, pos)`. This design avoids frequent 
copies of the value
+    // binary when reading a sub-variant in the array/object element.
+    private final int pos;
+
+    public BinaryVariant(byte[] value, byte[] metadata) {
+        this(value, metadata, 0);
+    }
+
+    private BinaryVariant(byte[] value, byte[] metadata, int pos) {
+        this.value = value;
+        this.metadata = metadata;
+        this.pos = pos;
+        // There is currently only one allowed version.
+        if (metadata.length < 1 || (metadata[0] & VERSION_MASK) != VERSION) {
+            throw malformedVariant();
+        }
+        // Don't attempt to use a Variant larger than 16 MiB. We'll never 
produce one, and it risks
+        // memory instability.
+        if (metadata.length > SIZE_LIMIT || value.length > SIZE_LIMIT) {
+            throw variantConstructorSizeLimit();
+        }
+    }
+
+    @Override
+    public boolean isPrimitive() {
+        return !isArray() && !isObject();
+    }
+
+    @Override
+    public boolean isArray() {
+        return getType() == Type.ARRAY;
+    }
+
+    @Override
+    public boolean isObject() {
+        return getType() == Type.OBJECT;
+    }
+
+    @Override
+    public boolean isNull() {
+        return getType() == Type.NULL;
+    }
+
+    @Override
+    public Type getType() {
+        return BinaryVariantUtil.getType(value, pos);
+    }
+
+    @Override
+    public boolean getBoolean() throws VariantTypeException {
+        checkType(Type.BOOLEAN, getType());
+        return BinaryVariantUtil.getBoolean(value, pos);
+    }
+
+    @Override
+    public byte getByte() throws VariantTypeException {
+        checkType(Type.TINYINT, getType());
+        return (byte) BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public short getShort() throws VariantTypeException {
+        checkType(Type.SMALLINT, getType());
+        return (short) BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public int getInt() throws VariantTypeException {
+        checkType(Type.INT, getType());
+        return (int) BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public long getLong() throws VariantTypeException {
+        checkType(Type.BIGINT, getType());
+        return BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public float getFloat() throws VariantTypeException {
+        checkType(Type.FLOAT, getType());
+        return BinaryVariantUtil.getFloat(value, pos);
+    }
+
+    @Override
+    public BigDecimal getDecimal() throws VariantTypeException {
+        checkType(Type.DECIMAL, getType());
+        return BinaryVariantUtil.getDecimal(value, pos);
+    }
+
+    @Override
+    public double getDouble() throws VariantTypeException {
+        checkType(Type.DOUBLE, getType());
+        return BinaryVariantUtil.getDouble(value, pos);
+    }
+
+    @Override
+    public String getString() throws VariantTypeException {
+        checkType(Type.STRING, getType());
+        return BinaryVariantUtil.getString(value, pos);
+    }
+
+    @Override
+    public LocalDate getDate() throws VariantTypeException {
+        checkType(Type.DATE, getType());
+        return LocalDate.ofEpochDay(BinaryVariantUtil.getLong(value, pos));
+    }
+
+    @Override
+    public LocalDateTime getTimestamp() throws VariantTypeException {

Review Comment:
   The naming seems a bit inconsistent here. Either we call this 
`getDateTime()` or the method below `getTimestampLtz()`. I would vote for 
calling this `getDateTime()`.



##########
flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariantUtil.java:
##########
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.variant.Variant.Type;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.util.Arrays;
+import java.util.Locale;
+
+/* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * This class defines constants related to the variant format and provides 
functions for
+ * manipulating variant binaries.
+ *
+ * <p>A variant is made up of 2 binaries: value and metadata. A variant value 
consists of a one-byte
+ * header and a number of content bytes (can be zero). The header byte is 
divided into upper 6 bits
+ * (called "type info") and lower 2 bits (called "basic type"). The content 
format is explained in
+ * the below constants for all possible basic type and type info values.
+ *
+ * <p>The variant metadata includes a version id and a dictionary of distinct 
strings
+ * (case-sensitive). Its binary format is: - Version: 1-byte unsigned integer. 
The only acceptable
+ * value is 1 currently. - Dictionary size: 4-byte little-endian unsigned 
integer. The number of
+ * keys in the dictionary. - Offsets: (size + 1) * 4-byte little-endian 
unsigned integers.
+ * `offsets[i]` represents the starting position of string i, counting 
starting from the address of
+ * `offsets[0]`. Strings must be stored contiguously, so we don’t need to 
store the string size,
+ * instead, we compute it with `offset[i + 1] - offset[i]`. - UTF-8 string 
data.
+ */
+@Internal
+public class BinaryVariantUtil {
+    public static final int BASIC_TYPE_BITS = 2;
+    public static final int BASIC_TYPE_MASK = 0x3;
+    public static final int TYPE_INFO_MASK = 0x3F;
+    // The inclusive maximum value of the type info value. It is the size 
limit of `SHORT_STR`.
+    public static final int MAX_SHORT_STR_SIZE = 0x3F;
+
+    // Below is all possible basic type values.
+    // Primitive value. The type info value must be one of the values in the 
below section.
+    public static final int PRIMITIVE = 0;
+    // Short string value. The type info value is the string size, which must 
be in `[0,
+    // kMaxShortStrSize]`.
+    // The string content bytes directly follow the header byte.
+    public static final int SHORT_STR = 1;
+    // Object value. The content contains a size, a list of field ids, a list 
of field offsets, and
+    // the actual field data. The length of the id list is `size`, while the 
length of the offset
+    // list is `size + 1`, where the last offset represent the total size of 
the field data. The
+    // fields in an object must be sorted by the field name in alphabetical 
order. Duplicate field
+    // names in one object are not allowed.
+    // We use 5 bits in the type info to specify the integer type of the 
object header: it should
+    // be 0_b4_b3b2_b1b0 (MSB is 0), where:
+    // - b4 specifies the type of size. When it is 0/1, `size` is a 
little-endian 1/4-byte
+    // unsigned integer.
+    // - b3b2/b1b0 specifies the integer type of id and offset. When the 2 
bits are  0/1/2, the
+    // list contains 1/2/3-byte little-endian unsigned integers.
+    public static final int OBJECT = 2;
+    // Array value. The content contains a size, a list of field offsets, and 
the actual element
+    // data. It is similar to an object without the id list. The length of the 
offset list
+    // is `size + 1`, where the last offset represent the total size of the 
element data.
+    // Its type info should be: 000_b2_b1b0:
+    // - b2 specifies the type of size.
+    // - b1b0 specifies the integer type of offset.
+    public static final int ARRAY = 3;
+
+    // Below is all possible type info values for `PRIMITIVE`.
+    // JSON Null value. Empty content.
+    public static final int NULL = 0;
+    // True value. Empty content.
+    public static final int TRUE = 1;
+    // False value. Empty content.
+    public static final int FALSE = 2;
+    // 1-byte little-endian signed integer.
+    public static final int INT1 = 3;
+    // 2-byte little-endian signed integer.
+    public static final int INT2 = 4;
+    // 4-byte little-endian signed integer.
+    public static final int INT4 = 5;
+    // 4-byte little-endian signed integer.
+    public static final int INT8 = 6;
+    // 8-byte IEEE double.
+    public static final int DOUBLE = 7;
+    // 4-byte decimal. Content is 1-byte scale + 4-byte little-endian signed 
integer.
+    public static final int DECIMAL4 = 8;
+    // 8-byte decimal. Content is 1-byte scale + 8-byte little-endian signed 
integer.
+    public static final int DECIMAL8 = 9;
+    // 16-byte decimal. Content is 1-byte scale + 16-byte little-endian signed 
integer.
+    public static final int DECIMAL16 = 10;
+    // Date value. Content is 4-byte little-endian signed integer that 
represents the number of days
+    // from the Unix epoch.
+    public static final int DATE = 11;
+    // Timestamp value. Content is 8-byte little-endian signed integer that 
represents the number of
+    // microseconds elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. It 
is displayed to users
+    // in
+    // their local time zones and may be displayed differently depending on 
the execution
+    // environment.
+    public static final int TIMESTAMP = 12;
+    // Timestamp_ntz value. It has the same content as `TIMESTAMP` but should 
always be interpreted
+    // as if the local time zone is UTC.
+    public static final int TIMESTAMP_NTZ = 13;
+    // 4-byte IEEE float.
+    public static final int FLOAT = 14;
+    // Binary value. The content is (4-byte little-endian unsigned integer 
representing the binary
+    // size) + (size bytes of binary content).
+    public static final int BINARY = 15;
+    // Long string value. The content is (4-byte little-endian unsigned 
integer representing the
+    // string size) + (size bytes of string content).
+    public static final int LONG_STR = 16;
+
+    public static final byte VERSION = 1;
+    // The lower 4 bits of the first metadata byte contain the version.
+    public static final byte VERSION_MASK = 0x0F;
+
+    public static final int U8_MAX = 0xFF;
+    public static final int U16_MAX = 0xFFFF;
+    public static final int U24_MAX = 0xFFFFFF;
+    public static final int U24_SIZE = 3;
+    public static final int U32_SIZE = 4;
+
+    // Both variant value and variant metadata need to be no longer than 16MiB.
+    public static final int SIZE_LIMIT = U24_MAX + 1;
+
+    public static final int MAX_DECIMAL4_PRECISION = 9;
+    public static final int MAX_DECIMAL8_PRECISION = 18;
+    public static final int MAX_DECIMAL16_PRECISION = 38;
+
+    public static final int BINARY_SEARCH_THRESHOLD = 32;
+
+    public static final DateTimeFormatter TIMESTAMP_FORMATTER =
+            new DateTimeFormatterBuilder()
+                    .append(DateTimeFormatter.ISO_LOCAL_DATE)
+                    .appendLiteral(' ')

Review Comment:
   If we want to format timestamps according to JSON we should actually use `T` 
in-between and `Z` at the end for TIMESTAMP_LTZ.
   
https://stackoverflow.com/questions/10286204/what-is-the-right-json-date-format
   



##########
flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariantUtil.java:
##########
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.variant.Variant.Type;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.util.Arrays;
+import java.util.Locale;
+
+/* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * This class defines constants related to the variant format and provides 
functions for
+ * manipulating variant binaries.
+ *
+ * <p>A variant is made up of 2 binaries: value and metadata. A variant value 
consists of a one-byte
+ * header and a number of content bytes (can be zero). The header byte is 
divided into upper 6 bits
+ * (called "type info") and lower 2 bits (called "basic type"). The content 
format is explained in
+ * the below constants for all possible basic type and type info values.
+ *
+ * <p>The variant metadata includes a version id and a dictionary of distinct 
strings
+ * (case-sensitive). Its binary format is: - Version: 1-byte unsigned integer. 
The only acceptable
+ * value is 1 currently. - Dictionary size: 4-byte little-endian unsigned 
integer. The number of
+ * keys in the dictionary. - Offsets: (size + 1) * 4-byte little-endian 
unsigned integers.
+ * `offsets[i]` represents the starting position of string i, counting 
starting from the address of
+ * `offsets[0]`. Strings must be stored contiguously, so we don’t need to 
store the string size,
+ * instead, we compute it with `offset[i + 1] - offset[i]`. - UTF-8 string 
data.
+ */
+@Internal
+public class BinaryVariantUtil {
+    public static final int BASIC_TYPE_BITS = 2;
+    public static final int BASIC_TYPE_MASK = 0x3;
+    public static final int TYPE_INFO_MASK = 0x3F;
+    // The inclusive maximum value of the type info value. It is the size 
limit of `SHORT_STR`.
+    public static final int MAX_SHORT_STR_SIZE = 0x3F;
+
+    // Below is all possible basic type values.
+    // Primitive value. The type info value must be one of the values in the 
below section.
+    public static final int PRIMITIVE = 0;
+    // Short string value. The type info value is the string size, which must 
be in `[0,
+    // kMaxShortStrSize]`.
+    // The string content bytes directly follow the header byte.
+    public static final int SHORT_STR = 1;
+    // Object value. The content contains a size, a list of field ids, a list 
of field offsets, and
+    // the actual field data. The length of the id list is `size`, while the 
length of the offset
+    // list is `size + 1`, where the last offset represent the total size of 
the field data. The
+    // fields in an object must be sorted by the field name in alphabetical 
order. Duplicate field
+    // names in one object are not allowed.
+    // We use 5 bits in the type info to specify the integer type of the 
object header: it should
+    // be 0_b4_b3b2_b1b0 (MSB is 0), where:
+    // - b4 specifies the type of size. When it is 0/1, `size` is a 
little-endian 1/4-byte
+    // unsigned integer.
+    // - b3b2/b1b0 specifies the integer type of id and offset. When the 2 
bits are  0/1/2, the
+    // list contains 1/2/3-byte little-endian unsigned integers.
+    public static final int OBJECT = 2;
+    // Array value. The content contains a size, a list of field offsets, and 
the actual element
+    // data. It is similar to an object without the id list. The length of the 
offset list
+    // is `size + 1`, where the last offset represent the total size of the 
element data.
+    // Its type info should be: 000_b2_b1b0:
+    // - b2 specifies the type of size.
+    // - b1b0 specifies the integer type of offset.
+    public static final int ARRAY = 3;
+
+    // Below is all possible type info values for `PRIMITIVE`.
+    // JSON Null value. Empty content.
+    public static final int NULL = 0;
+    // True value. Empty content.
+    public static final int TRUE = 1;
+    // False value. Empty content.
+    public static final int FALSE = 2;
+    // 1-byte little-endian signed integer.
+    public static final int INT1 = 3;
+    // 2-byte little-endian signed integer.
+    public static final int INT2 = 4;
+    // 4-byte little-endian signed integer.
+    public static final int INT4 = 5;
+    // 4-byte little-endian signed integer.
+    public static final int INT8 = 6;
+    // 8-byte IEEE double.
+    public static final int DOUBLE = 7;
+    // 4-byte decimal. Content is 1-byte scale + 4-byte little-endian signed 
integer.
+    public static final int DECIMAL4 = 8;
+    // 8-byte decimal. Content is 1-byte scale + 8-byte little-endian signed 
integer.
+    public static final int DECIMAL8 = 9;
+    // 16-byte decimal. Content is 1-byte scale + 16-byte little-endian signed 
integer.
+    public static final int DECIMAL16 = 10;
+    // Date value. Content is 4-byte little-endian signed integer that 
represents the number of days
+    // from the Unix epoch.
+    public static final int DATE = 11;
+    // Timestamp value. Content is 8-byte little-endian signed integer that 
represents the number of
+    // microseconds elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. It 
is displayed to users
+    // in
+    // their local time zones and may be displayed differently depending on 
the execution
+    // environment.
+    public static final int TIMESTAMP = 12;
+    // Timestamp_ntz value. It has the same content as `TIMESTAMP` but should 
always be interpreted
+    // as if the local time zone is UTC.
+    public static final int TIMESTAMP_NTZ = 13;
+    // 4-byte IEEE float.
+    public static final int FLOAT = 14;
+    // Binary value. The content is (4-byte little-endian unsigned integer 
representing the binary
+    // size) + (size bytes of binary content).
+    public static final int BINARY = 15;
+    // Long string value. The content is (4-byte little-endian unsigned 
integer representing the
+    // string size) + (size bytes of string content).
+    public static final int LONG_STR = 16;
+
+    public static final byte VERSION = 1;
+    // The lower 4 bits of the first metadata byte contain the version.
+    public static final byte VERSION_MASK = 0x0F;
+
+    public static final int U8_MAX = 0xFF;
+    public static final int U16_MAX = 0xFFFF;
+    public static final int U24_MAX = 0xFFFFFF;
+    public static final int U24_SIZE = 3;
+    public static final int U32_SIZE = 4;
+
+    // Both variant value and variant metadata need to be no longer than 16MiB.
+    public static final int SIZE_LIMIT = U24_MAX + 1;
+
+    public static final int MAX_DECIMAL4_PRECISION = 9;
+    public static final int MAX_DECIMAL8_PRECISION = 18;
+    public static final int MAX_DECIMAL16_PRECISION = 38;
+
+    public static final int BINARY_SEARCH_THRESHOLD = 32;
+
+    public static final DateTimeFormatter TIMESTAMP_FORMATTER =
+            new DateTimeFormatterBuilder()
+                    .append(DateTimeFormatter.ISO_LOCAL_DATE)
+                    .appendLiteral(' ')
+                    .append(DateTimeFormatter.ISO_LOCAL_TIME)
+                    .toFormatter(Locale.US);
+
+    public static final DateTimeFormatter TIMESTAMP_LTZ_FORMATTER =
+            new DateTimeFormatterBuilder()
+                    .append(TIMESTAMP_FORMATTER)
+                    .appendOffset("+HH:MM", "+00:00")
+                    .toFormatter(Locale.US);
+
+    // Write the least significant `numBytes` bytes in `value` into 
`bytes[pos, pos + numBytes)` in
+    // little endian.
+    public static void writeLong(byte[] bytes, int pos, long value, int 
numBytes) {
+        for (int i = 0; i < numBytes; ++i) {
+            bytes[pos + i] = (byte) ((value >>> (8 * i)) & 0xFF);
+        }
+    }
+
+    public static byte primitiveHeader(int type) {
+        return (byte) (type << 2 | PRIMITIVE);
+    }
+
+    public static byte shortStrHeader(int size) {
+        return (byte) (size << 2 | SHORT_STR);
+    }
+
+    public static byte objectHeader(boolean largeSize, int idSize, int 
offsetSize) {
+        return (byte)
+                (((largeSize ? 1 : 0) << (BASIC_TYPE_BITS + 4))
+                        | ((idSize - 1) << (BASIC_TYPE_BITS + 2))
+                        | ((offsetSize - 1) << BASIC_TYPE_BITS)
+                        | OBJECT);
+    }
+
+    public static byte arrayHeader(boolean largeSize, int offsetSize) {
+        return (byte)
+                (((largeSize ? 1 : 0) << (BASIC_TYPE_BITS + 2))
+                        | ((offsetSize - 1) << BASIC_TYPE_BITS)
+                        | ARRAY);
+    }
+
+    // An exception indicating that the variant value or metadata doesn't
+    static RuntimeException malformedVariant() {
+        return new RuntimeException("MALFORMED_VARIANT");

Review Comment:
   Use `VariantTypeException` everywhere in this class? 



##########
flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java:
##########
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.CharArrayWriter;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Objects;
+
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.BINARY_SEARCH_THRESHOLD;
+import static org.apache.flink.types.variant.BinaryVariantUtil.SIZE_LIMIT;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP_FORMATTER;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.TIMESTAMP_LTZ_FORMATTER;
+import static org.apache.flink.types.variant.BinaryVariantUtil.VERSION;
+import static org.apache.flink.types.variant.BinaryVariantUtil.VERSION_MASK;
+import static org.apache.flink.types.variant.BinaryVariantUtil.checkIndex;
+import static org.apache.flink.types.variant.BinaryVariantUtil.getMetadataKey;
+import static org.apache.flink.types.variant.BinaryVariantUtil.handleArray;
+import static org.apache.flink.types.variant.BinaryVariantUtil.handleObject;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.malformedVariant;
+import static org.apache.flink.types.variant.BinaryVariantUtil.readUnsigned;
+import static org.apache.flink.types.variant.BinaryVariantUtil.unexpectedType;
+import static org.apache.flink.types.variant.BinaryVariantUtil.valueSize;
+import static 
org.apache.flink.types.variant.BinaryVariantUtil.variantConstructorSizeLimit;
+
+/**
+ * A data structure that represents a semi-structured value. It consists of 
two binary values: value
+ * and metadata. The value encodes types and values, but not field names. The 
metadata currently
+ * contains a version flag and a list of field names. We can extend/modify the 
detailed binary
+ * format given the version flag.
+ *
+ * @see <a 
href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md";>Variant
+ *     Binary Encoding</a> for the detail layout of the data structure.
+ */
+@Internal
+public final class BinaryVariant implements Variant {
+
+    private final byte[] value;
+    private final byte[] metadata;
+    // The variant value doesn't use the whole `value` binary, but starts from 
its `pos` index and
+    // spans a size of `valueSize(value, pos)`. This design avoids frequent 
copies of the value
+    // binary when reading a sub-variant in the array/object element.
+    private final int pos;
+
+    public BinaryVariant(byte[] value, byte[] metadata) {
+        this(value, metadata, 0);
+    }
+
+    private BinaryVariant(byte[] value, byte[] metadata, int pos) {
+        this.value = value;
+        this.metadata = metadata;
+        this.pos = pos;
+        // There is currently only one allowed version.
+        if (metadata.length < 1 || (metadata[0] & VERSION_MASK) != VERSION) {
+            throw malformedVariant();
+        }
+        // Don't attempt to use a Variant larger than 16 MiB. We'll never 
produce one, and it risks
+        // memory instability.
+        if (metadata.length > SIZE_LIMIT || value.length > SIZE_LIMIT) {
+            throw variantConstructorSizeLimit();
+        }
+    }
+
+    @Override
+    public boolean isPrimitive() {
+        return !isArray() && !isObject();
+    }
+
+    @Override
+    public boolean isArray() {
+        return getType() == Type.ARRAY;
+    }
+
+    @Override
+    public boolean isObject() {
+        return getType() == Type.OBJECT;
+    }
+
+    @Override
+    public boolean isNull() {
+        return getType() == Type.NULL;
+    }
+
+    @Override
+    public Type getType() {
+        return BinaryVariantUtil.getType(value, pos);
+    }
+
+    @Override
+    public boolean getBoolean() throws VariantTypeException {
+        checkType(Type.BOOLEAN, getType());
+        return BinaryVariantUtil.getBoolean(value, pos);
+    }
+
+    @Override
+    public byte getByte() throws VariantTypeException {
+        checkType(Type.TINYINT, getType());
+        return (byte) BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public short getShort() throws VariantTypeException {
+        checkType(Type.SMALLINT, getType());
+        return (short) BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public int getInt() throws VariantTypeException {
+        checkType(Type.INT, getType());
+        return (int) BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public long getLong() throws VariantTypeException {
+        checkType(Type.BIGINT, getType());
+        return BinaryVariantUtil.getLong(value, pos);
+    }
+
+    @Override
+    public float getFloat() throws VariantTypeException {
+        checkType(Type.FLOAT, getType());
+        return BinaryVariantUtil.getFloat(value, pos);
+    }
+
+    @Override
+    public BigDecimal getDecimal() throws VariantTypeException {
+        checkType(Type.DECIMAL, getType());
+        return BinaryVariantUtil.getDecimal(value, pos);
+    }
+
+    @Override
+    public double getDouble() throws VariantTypeException {
+        checkType(Type.DOUBLE, getType());
+        return BinaryVariantUtil.getDouble(value, pos);
+    }
+
+    @Override
+    public String getString() throws VariantTypeException {
+        checkType(Type.STRING, getType());
+        return BinaryVariantUtil.getString(value, pos);
+    }
+
+    @Override
+    public LocalDate getDate() throws VariantTypeException {
+        checkType(Type.DATE, getType());
+        return LocalDate.ofEpochDay(BinaryVariantUtil.getLong(value, pos));
+    }
+
+    @Override
+    public LocalDateTime getTimestamp() throws VariantTypeException {
+        checkType(Type.TIMESTAMP, getType());
+        return microsToInstant(BinaryVariantUtil.getLong(value, pos))
+                .atZone(ZoneOffset.UTC)
+                .toLocalDateTime();
+    }
+
+    @Override
+    public Instant getInstant() throws VariantTypeException {
+        checkType(Type.TIMESTAMP_LTZ, getType());
+        return microsToInstant(BinaryVariantUtil.getLong(value, pos));
+    }
+
+    @Override
+    public byte[] getBytes() throws VariantTypeException {
+        checkType(Type.BINARY, getType());
+        return BinaryVariantUtil.getBinary(value, pos);
+    }
+
+    @Override
+    public Object get() throws VariantTypeException {
+        switch (getType()) {
+            case NULL:
+                return null;
+            case BOOLEAN:
+                return getBoolean();
+            case TINYINT:
+                return getByte();
+            case SMALLINT:
+                return getShort();
+            case INT:
+                return getInt();
+            case BIGINT:
+                return getLong();
+            case FLOAT:
+                return getFloat();
+            case DOUBLE:
+                return getDouble();
+            case DECIMAL:
+                return getDecimal();
+            case STRING:
+                return getString();
+            case DATE:
+                return getDate();
+            case TIMESTAMP:
+                return getTimestamp();
+            case TIMESTAMP_LTZ:
+                return getInstant();
+            case BINARY:
+                return getBytes();
+            default:
+                throw new VariantTypeException(
+                        String.format("Expecting a scalar variant but got %s", 
getType()));
+        }
+    }
+
+    @Override
+    public <T> T getAs() throws VariantTypeException {
+        return (T) get();
+    }
+
+    @Override
+    public Variant getElement(int index) throws VariantTypeException {
+        return getElementAtIndex(index);
+    }
+
+    @Override
+    public Variant getField(String fieldName) throws VariantTypeException {
+        return getFieldByKey(fieldName);
+    }
+
+    @Override
+    public String toJson() {
+        StringBuilder sb = new StringBuilder();
+        toJsonImpl(value, metadata, pos, sb, ZoneId.systemDefault());

Review Comment:
   ```suggestion
           toJsonImpl(value, metadata, pos, sb, ZoneId.UTC);
   ```
   We should ensure deterministic behavior across JVMs / machines.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VariantType.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.types.variant.BinaryVariant;
+import org.apache.flink.types.variant.Variant;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Data type of semi-structured data.
+ *
+ * <p>The type supports storing any semi-structured data, including ARRAY, 
MAP, and scalar types.

Review Comment:
   Add more JavaDoc and explain the core characteristics of this type. For 
example:
   - stores the data type as part of the payload
   - close to the semantics of JSON
   - serializable string representation is `VARIANT`
   - has a kind of schema evolution story built in
   - explain differences to ROW and STRUCTURED



##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/VariantSerializerTest.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.variant.BinaryVariantBuilder;
+import org.apache.flink.types.variant.Variant;
+
+class VariantSerializerTest extends SerializerTestBase<Variant> {
+
+    @Override
+    protected TypeSerializer<Variant> createSerializer() {
+        return VariantSerializer.INSTANCE;
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @Override
+    protected Class<Variant> getTypeClass() {
+        return Variant.class;
+    }
+
+    @Override
+    protected Variant[] getTestData() {
+        BinaryVariantBuilder builder = new BinaryVariantBuilder();
+        return new Variant[] {builder.object().add("k", 
builder.of(1)).build()};

Review Comment:
   add more complex test data: more then one field, arrays and other types



##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/VariantSerializerUpgradeTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConditions;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+import 
org.apache.flink.api.common.typeutils.base.VariantSerializer.VariantSerializerSnapshot;
+import org.apache.flink.test.util.MigrationTest;
+import org.apache.flink.types.variant.BinaryVariantBuilder;
+import org.apache.flink.types.variant.Variant;
+
+import org.assertj.core.api.Condition;
+import org.junit.jupiter.api.Disabled;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/** A {@link TypeSerializerUpgradeTestBase} for {@link 
VariantSerializerSnapshot}. */
+@Disabled("FLINK-XXXXX")
+class VariantSerializerUpgradeTest extends 
TypeSerializerUpgradeTestBase<Variant, Variant> {
+
+    private static final String SPEC_NAME = "variant-serializer";
+
+    @Override
+    public Collection<TestSpecification<?, ?>> 
createTestSpecifications(FlinkVersion currentVersion)
+            throws Exception {
+        ArrayList<TestSpecification<?, ?>> testSpecifications = new 
ArrayList<>();
+        testSpecifications.add(
+                new TestSpecification<>(
+                        SPEC_NAME,
+                        currentVersion,
+                        VariantSerializerSetup.class,
+                        VariantSerializerVerifier.class));
+
+        return testSpecifications;
+    }
+
+    @Override
+    public Collection<FlinkVersion> getMigrationVersions() {
+        return FlinkVersion.rangeOf(
+                FlinkVersion.V2_1, 
MigrationTest.getMostRecentlyPublishedVersion());
+    }
+
+    // 
----------------------------------------------------------------------------------------------
+    //  Specification for "variant-serializer"
+    // 
----------------------------------------------------------------------------------------------
+
+    /**
+     * This class is only public to work with {@link
+     * org.apache.flink.api.common.typeutils.ClassRelocator}.
+     */
+    public static final class VariantSerializerSetup implements 
PreUpgradeSetup<Variant> {
+        @Override
+        public TypeSerializer<Variant> createPriorSerializer() {
+            return VariantSerializer.INSTANCE;
+        }
+
+        @Override
+        public Variant createTestData() {
+            BinaryVariantBuilder builder = new BinaryVariantBuilder();
+            return builder.object().add("k", builder.of(1)).build();

Review Comment:
   add more complex test data: more then one field, arrays and other types



##########
flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class BinaryVariantTest {
+
+    private BinaryVariantBuilder builder;
+
+    @BeforeEach
+    void setUp() {
+        builder = new BinaryVariantBuilder();
+    }
+
+    @Test
+    void testScalarVariant() {
+
+        assertThat(builder.of((byte) 10).isPrimitive()).isTrue();
+        assertThat(builder.of((byte) 10).isNull()).isFalse();
+        assertThat(builder.of((byte) 10).isArray()).isFalse();
+        assertThat(builder.of((byte) 10).isObject()).isFalse();
+        assertThat(builder.of((byte) 
10).getType()).isEqualTo(Variant.Type.TINYINT);
+
+        assertThat(builder.of((byte) 10).getByte()).isEqualTo((byte) 10);
+        assertThat(builder.of((byte) 10).get()).isEqualTo((byte) 10);
+        assertThat((byte) builder.of((byte) 10).getAs()).isEqualTo((byte) 10);
+
+        assertThat(builder.of((short) 10).getShort()).isEqualTo((short) 10);
+        assertThat(builder.of((short) 10).get()).isEqualTo((short) 10);
+
+        assertThat(builder.of(10).getInt()).isEqualTo(10);
+        assertThat(builder.of(10).get()).isEqualTo(10);
+
+        assertThat(builder.of(10L).getLong()).isEqualTo(10L);
+        assertThat(builder.of(10L).get()).isEqualTo(10L);
+
+        assertThat(builder.of(10.0).getDouble()).isEqualTo(10.0d);
+        assertThat(builder.of(10.0).get()).isEqualTo(10.0d);
+
+        assertThat(builder.of(10.0f).getFloat()).isEqualTo(10.0f);
+        assertThat(builder.of(10.0f).get()).isEqualTo(10.0f);
+
+        assertThat(builder.of("hello").getString()).isEqualTo("hello");
+        assertThat(builder.of("hello").get()).isEqualTo("hello");
+
+        
assertThat(builder.of("hello".getBytes()).getBytes()).isEqualTo("hello".getBytes());
+        
assertThat(builder.of("hello".getBytes()).get()).isEqualTo("hello".getBytes());
+
+        assertThat(builder.of(true).getBoolean()).isTrue();
+        assertThat(builder.of(true).get()).isEqualTo(true);
+
+        assertThat(builder.of(BigDecimal.valueOf(100)).getDecimal())
+                .isEqualByComparingTo(BigDecimal.valueOf(100));
+        assertThat((BigDecimal) builder.of(BigDecimal.valueOf(100)).get())
+                .isEqualByComparingTo(BigDecimal.valueOf(100));
+
+        Instant instant = Instant.now();
+        assertThat(builder.of(instant).getInstant()).isEqualTo(instant);
+        assertThat(builder.of(instant).get()).isEqualTo(instant);
+
+        LocalDateTime localDateTime = LocalDateTime.now();
+        
assertThat(builder.of(localDateTime).getTimestamp()).isEqualTo(localDateTime);
+        assertThat(builder.of(localDateTime).get()).isEqualTo(localDateTime);
+
+        LocalDate localDate = LocalDate.now();
+        assertThat(builder.of(localDate).getDate()).isEqualTo(localDate);
+        assertThat(builder.of(localDate).get()).isEqualTo(localDate);
+
+        assertThat(builder.ofNull().get()).isEqualTo(null);
+        assertThat(builder.ofNull().isNull()).isTrue();
+    }
+
+    @Test
+    void testArrayVariant() {
+        Instant now = Instant.now();
+        Variant variant =
+                builder.array()
+                        .add(builder.of(1))
+                        .add(builder.of("hello"))
+                        .add(builder.of(now))
+                        
.add(builder.array().add(builder.of("hello2")).add(builder.of(10f)).build())
+                        .build();
+
+        assertThat(variant.isArray()).isTrue();
+        assertThat(variant.isPrimitive()).isFalse();
+        assertThat(variant.isObject()).isFalse();
+        assertThat(variant.getType()).isEqualTo(Variant.Type.ARRAY);
+
+        assertThat(variant.getElement(-1)).isNull();
+        assertThat(variant.getElement(0).getInt()).isEqualTo(1);
+        assertThat(variant.getElement(1).getString()).isEqualTo("hello");
+        assertThat(variant.getElement(2).getInstant()).isEqualTo(now);
+        
assertThat(variant.getElement(3).getElement(0).getString()).isEqualTo("hello2");
+        
assertThat(variant.getElement(3).getElement(1).getFloat()).isEqualTo(10f);
+        assertThat(variant.getElement(4)).isNull();
+    }
+
+    @Test
+    void testObjectVariant() {
+        Variant variant =
+                builder.object()
+                        .add(
+                                "list",
+                                
builder.array().add(builder.of("hello")).add(builder.of(1)).build())
+                        .add(
+                                "object",
+                                builder.object()
+                                        .add("ss", builder.of((short) 1))
+                                        .add("ff", builder.of(10.0f))
+                                        .build())
+                        .add("bb", builder.of((byte) 10))
+                        .build();
+
+        assertThat(variant.isArray()).isFalse();
+        assertThat(variant.isPrimitive()).isFalse();
+        assertThat(variant.isObject()).isTrue();
+        assertThat(variant.getType()).isEqualTo(Variant.Type.OBJECT);
+
+        assertThat(variant.getField("list").isArray()).isTrue();
+        
assertThat(variant.getField("list").getElement(0).getString()).isEqualTo("hello");
+        
assertThat(variant.getField("list").getElement(1).getInt()).isEqualTo(1);
+
+        assertThat(variant.getField("object").isObject()).isTrue();
+        
assertThat(variant.getField("object").getField("ss").getShort()).isEqualTo((short)
 1);
+        
assertThat(variant.getField("object").getField("ff").getFloat()).isEqualTo((10.0f));
+
+        assertThat(variant.getField("bb").getByte()).isEqualTo((byte) 10);
+        assertThat(variant.getField("non_exist")).isNull();
+
+        BinaryVariantBuilder.VariantObjectBuilder objectBuilder = 
builder.object();
+
+        for (int i = 0; i < 100; i++) {
+            objectBuilder.add(String.valueOf(i), builder.of(i));
+        }
+        variant = objectBuilder.build();
+        for (int i = 0; i < 100; i++) {
+            
assertThat(variant.getField(String.valueOf(i)).getInt()).isEqualTo(i);
+        }
+    }
+
+    @Test
+    void testDuplicatedKeyObjectVariant() {
+        assertThatThrownBy(
+                        () ->
+                                builder.object(false)
+                                        .add("k", builder.of((byte) 10))
+                                        .add("k", builder.of("hello"))
+                                        .build())
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage("VARIANT_DUPLICATE_KEY");
+
+        Variant variant =
+                builder.object(true)
+                        .add("k", builder.of((byte) 10))
+                        .add("k", builder.of("hello"))
+                        .add("k1", builder.of(10))
+                        .build();
+
+        assertThat(variant.getField("k").getString()).isEqualTo("hello");
+        assertThat(variant.getField("k1").getInt()).isEqualTo(10);
+    }
+
+    @Test
+    void testToJsonScalar() {
+        Instant instant = Instant.EPOCH;
+        LocalDateTime localDateTime = LocalDateTime.of(2000, 1, 1, 0, 0);
+        LocalDate localDate = LocalDate.of(2000, 1, 1);
+
+        assertThat(builder.of((byte) 1).toJson()).isEqualTo("1");
+        assertThat(builder.of((short) 1).toJson()).isEqualTo("1");
+        assertThat(builder.of(1L).toJson()).isEqualTo("1");
+        assertThat(builder.of(1).toJson()).isEqualTo("1");
+        assertThat(builder.of("hello").toJson()).isEqualTo("\"hello\"");
+        assertThat(builder.of(true).toJson()).isEqualTo("true");
+        assertThat(builder.of(10.0f).toJson()).isEqualTo("10.0");
+        assertThat(builder.of(10.0d).toJson()).isEqualTo("10.0");
+        
assertThat(builder.of(BigDecimal.valueOf(100)).toJson()).isEqualTo("100");
+        assertThat(builder.of(instant).toJson()).isEqualTo("\"1970-01-01 
08:00:00+08:00\"");

Review Comment:
   This is not correct. It should not materialize the timezone but return the 
timestamp in UTC like `1970-01-01T08:00:00Z`. Otherwise the semantics are 
TIMESTAMP_TZ and not TIMESTAMP_LTZ.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java:
##########
@@ -147,6 +147,7 @@ public final class TypeInfoDataTypeConverter {
                 PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
                 
DataTypes.ARRAY(DataTypes.DOUBLE().notNull().bridgedTo(double.class))
                         .bridgedTo(double[].class));
+        conversionMap.put(Types.VARIANT, DataTypes.VARIANT());

Review Comment:
   nit: to make it explicit
   ```suggestion
           conversionMap.put(Types.VARIANT, 
DataTypes.VARIANT().bridgedTo(Variant.class));
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VariantType.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.types.variant.BinaryVariant;
+import org.apache.flink.types.variant.Variant;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Data type of semi-structured data.
+ *
+ * <p>The type supports storing any semi-structured data, including ARRAY, 
MAP, and scalar types.
+ * VARIANT can only store MAP types with keys of type STRING.
+ */
+@PublicEvolving
+public class VariantType extends LogicalType {

Review Comment:
   ```suggestion
   public final class VariantType extends LogicalType {
   ```



##########
flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantInternalBuilderTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class BinaryVariantInternalBuilderTest {
+
+    @Test
+    void testParseScalarJson() throws IOException {
+        assertThat(BinaryVariantInternalBuilder.parseJson("1", 
false).getByte())
+                .isEqualTo((byte) 1);
+        short s = (short) (Byte.MAX_VALUE + 1L);
+        assertThat(BinaryVariantInternalBuilder.parseJson(String.valueOf(s), 
false).getShort())
+                .isEqualTo(s);
+        int i = (int) (Short.MAX_VALUE + 1L);
+        assertThat(BinaryVariantInternalBuilder.parseJson(String.valueOf(i), 
false).getInt())
+                .isEqualTo(i);
+        long l = Integer.MAX_VALUE + 1L;
+        assertThat(BinaryVariantInternalBuilder.parseJson(String.valueOf(l), 
false).getLong())
+                .isEqualTo(l);
+
+        BigDecimal bigDecimal = 
BigDecimal.valueOf(Long.MAX_VALUE).add(BigDecimal.ONE);
+        assertThat(
+                        
BinaryVariantInternalBuilder.parseJson(bigDecimal.toPlainString(), false)
+                                .getDecimal())
+                .isEqualTo(bigDecimal);
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("1.123", 
false).getDecimal())
+                .isEqualTo(BigDecimal.valueOf(1.123));
+        assertThat(
+                        BinaryVariantInternalBuilder.parseJson(
+                                        String.valueOf(Double.MAX_VALUE), 
false)
+                                .getDouble())
+                .isEqualTo(Double.MAX_VALUE);
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("\"hello\"", 
false).getString())
+                .isEqualTo("hello");
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("true", 
false).getBoolean()).isTrue();
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("false", 
false).getBoolean()).isFalse();
+
+        assertThat(BinaryVariantInternalBuilder.parseJson("null", 
false).isNull()).isTrue();
+    }
+
+    @Test
+    void testParseJsonArray() throws IOException {
+        BinaryVariant variant = BinaryVariantInternalBuilder.parseJson("[]", 
false);
+        assertThat(variant.getElement(0)).isNull();
+
+        variant = BinaryVariantInternalBuilder.parseJson("[1,\"hello\",3.1]", 
false);
+        assertThat(variant.getElement(0).getByte()).isEqualTo((byte) 1);
+        assertThat(variant.getElement(1).getString()).isEqualTo("hello");
+        
assertThat(variant.getElement(2).getDecimal()).isEqualTo(BigDecimal.valueOf(3.1));
+
+        variant = 
BinaryVariantInternalBuilder.parseJson("[1,[\"hello\",[3.1]]]", false);
+        assertThat(variant.getElement(0).getByte()).isEqualTo((byte) 1);
+        
assertThat(variant.getElement(1).getElement(0).getString()).isEqualTo("hello");
+        
assertThat(variant.getElement(1).getElement(1).getElement(0).getDecimal())
+                .isEqualTo(BigDecimal.valueOf(3.1));
+    }
+
+    @Test
+    void testParseJsonObject() throws IOException {
+        BinaryVariant variant = BinaryVariantInternalBuilder.parseJson("{}", 
false);
+        assertThat(variant.getField("a")).isNull();
+
+        variant =
+                BinaryVariantInternalBuilder.parseJson(
+                        "{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false);
+
+        assertThat(variant.getField("a").getByte()).isEqualTo((byte) 1);
+        assertThat(variant.getField("b").getString()).isEqualTo("hello");
+        
assertThat(variant.getField("c").getDecimal()).isEqualTo(BigDecimal.valueOf(3.1));
+
+        variant =
+                BinaryVariantInternalBuilder.parseJson(
+                        "{\"a\":1,\"b\":{\"c\":\"hello\",\"d\":[3.1]}}", 
false);
+        assertThat(variant.getField("a").getByte()).isEqualTo((byte) 1);
+        
assertThat(variant.getField("b").getField("c").getString()).isEqualTo("hello");
+        
assertThat(variant.getField("b").getField("d").getElement(0).getDecimal())

Review Comment:
   test parsing of duplicate keys as well



##########
flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class BinaryVariantTest {
+
+    private BinaryVariantBuilder builder;
+
+    @BeforeEach
+    void setUp() {
+        builder = new BinaryVariantBuilder();
+    }
+
+    @Test
+    void testScalarVariant() {
+
+        assertThat(builder.of((byte) 10).isPrimitive()).isTrue();
+        assertThat(builder.of((byte) 10).isNull()).isFalse();
+        assertThat(builder.of((byte) 10).isArray()).isFalse();
+        assertThat(builder.of((byte) 10).isObject()).isFalse();
+        assertThat(builder.of((byte) 
10).getType()).isEqualTo(Variant.Type.TINYINT);
+
+        assertThat(builder.of((byte) 10).getByte()).isEqualTo((byte) 10);
+        assertThat(builder.of((byte) 10).get()).isEqualTo((byte) 10);
+        assertThat((byte) builder.of((byte) 10).getAs()).isEqualTo((byte) 10);
+
+        assertThat(builder.of((short) 10).getShort()).isEqualTo((short) 10);
+        assertThat(builder.of((short) 10).get()).isEqualTo((short) 10);
+
+        assertThat(builder.of(10).getInt()).isEqualTo(10);
+        assertThat(builder.of(10).get()).isEqualTo(10);
+
+        assertThat(builder.of(10L).getLong()).isEqualTo(10L);
+        assertThat(builder.of(10L).get()).isEqualTo(10L);
+
+        assertThat(builder.of(10.0).getDouble()).isEqualTo(10.0d);
+        assertThat(builder.of(10.0).get()).isEqualTo(10.0d);
+
+        assertThat(builder.of(10.0f).getFloat()).isEqualTo(10.0f);
+        assertThat(builder.of(10.0f).get()).isEqualTo(10.0f);
+
+        assertThat(builder.of("hello").getString()).isEqualTo("hello");
+        assertThat(builder.of("hello").get()).isEqualTo("hello");
+
+        
assertThat(builder.of("hello".getBytes()).getBytes()).isEqualTo("hello".getBytes());
+        
assertThat(builder.of("hello".getBytes()).get()).isEqualTo("hello".getBytes());
+
+        assertThat(builder.of(true).getBoolean()).isTrue();
+        assertThat(builder.of(true).get()).isEqualTo(true);
+
+        assertThat(builder.of(BigDecimal.valueOf(100)).getDecimal())
+                .isEqualByComparingTo(BigDecimal.valueOf(100));
+        assertThat((BigDecimal) builder.of(BigDecimal.valueOf(100)).get())
+                .isEqualByComparingTo(BigDecimal.valueOf(100));
+
+        Instant instant = Instant.now();
+        assertThat(builder.of(instant).getInstant()).isEqualTo(instant);
+        assertThat(builder.of(instant).get()).isEqualTo(instant);
+
+        LocalDateTime localDateTime = LocalDateTime.now();
+        
assertThat(builder.of(localDateTime).getTimestamp()).isEqualTo(localDateTime);
+        assertThat(builder.of(localDateTime).get()).isEqualTo(localDateTime);
+
+        LocalDate localDate = LocalDate.now();
+        assertThat(builder.of(localDate).getDate()).isEqualTo(localDate);
+        assertThat(builder.of(localDate).get()).isEqualTo(localDate);
+
+        assertThat(builder.ofNull().get()).isEqualTo(null);
+        assertThat(builder.ofNull().isNull()).isTrue();
+    }
+
+    @Test
+    void testArrayVariant() {
+        Instant now = Instant.now();
+        Variant variant =
+                builder.array()
+                        .add(builder.of(1))
+                        .add(builder.of("hello"))
+                        .add(builder.of(now))

Review Comment:
   test null element in array



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java:
##########
@@ -976,6 +977,10 @@ public static <T> DataType STRUCTURED(Class<T> 
implementationClass, Field... fie
         return new FieldsDataType(builder.build(), implementationClass, 
fieldDataTypes);
     }
 
+    public static DataType VARIANT() {

Review Comment:
   This is public facing API. We need an extensive JavaDoc that explains the 
characteristics of this type. Take `ROW()` or `STRUCTURED()` in this class as 
an example.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala:
##########
@@ -499,6 +499,14 @@ object BuiltInMethods {
 
   val IS_JSON_SCALAR = Types.lookupMethod(classOf[SqlJsonUtils], 
"isJsonScalar", classOf[String])
 
+  // VARIANT functions
+
+  val PARSE_JSON =

Review Comment:
   Use the new function stack instead that uses `BuiltInFunctionDefinition` + 
runtime implementation. Take a look at `TypeOf` function as an example.



##########
flink-core/src/main/java/org/apache/flink/types/variant/Variant.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/** Variant represent a semi-structured data. */
+@PublicEvolving
+public interface Variant extends Serializable {

Review Comment:
   Is there a test that tests `Serializable`?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/VariantScalaITCase.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.runtime.stream.sql
+
+import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction}
+import org.apache.flink.table.planner.JLong
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
+import org.apache.flink.types.{Row, RowKind}
+import org.apache.flink.types.variant.{BinaryVariantBuilder, Variant}
+import org.apache.flink.util.CollectionUtil
+
+import org.assertj.core.api.Assertions.assertThatList
+import org.junit.jupiter.api.Test
+
+class VariantScalaITCase extends StreamingTestBase {

Review Comment:
   Use the new SemanticTestBase like e.g. ProcessTableFunctionSemanticTests 
they make testing input and output results very easy.
   
   We shouldn't add more Scala code.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/VariantJavaITCase.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.BinaryVariantBuilder;
+import org.apache.flink.types.variant.Variant;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class VariantJavaITCase extends StreamingTestBase {

Review Comment:
   Use the new `SemanticTestBase` like e.g. `ProcessTableFunctionSemanticTests` 
they make testing input and output results very easy.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java:
##########
@@ -1333,6 +1333,32 @@ public List<SqlGroupedWindowFunction> 
getAuxiliaryFunctions() {
     public static final SqlPostfixOperator IS_NOT_JSON_SCALAR =
             SqlStdOperatorTable.IS_NOT_JSON_SCALAR;
 
+    // VARIANT FUNCTIONS
+    public static final SqlFunction PARSE_JSON =

Review Comment:
   Use the new function stack in `BuiltInFunctionDefinitions` instead of 
Calcite's `SqlFunction`.



##########
flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariantUtil.java:
##########
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.variant.Variant.Type;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.util.Arrays;
+import java.util.Locale;
+
+/* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * This class defines constants related to the variant format and provides 
functions for
+ * manipulating variant binaries.
+ *
+ * <p>A variant is made up of 2 binaries: value and metadata. A variant value 
consists of a one-byte
+ * header and a number of content bytes (can be zero). The header byte is 
divided into upper 6 bits
+ * (called "type info") and lower 2 bits (called "basic type"). The content 
format is explained in
+ * the below constants for all possible basic type and type info values.
+ *
+ * <p>The variant metadata includes a version id and a dictionary of distinct 
strings
+ * (case-sensitive). Its binary format is: - Version: 1-byte unsigned integer. 
The only acceptable
+ * value is 1 currently. - Dictionary size: 4-byte little-endian unsigned 
integer. The number of
+ * keys in the dictionary. - Offsets: (size + 1) * 4-byte little-endian 
unsigned integers.
+ * `offsets[i]` represents the starting position of string i, counting 
starting from the address of
+ * `offsets[0]`. Strings must be stored contiguously, so we don’t need to 
store the string size,
+ * instead, we compute it with `offset[i + 1] - offset[i]`. - UTF-8 string 
data.
+ */
+@Internal
+public class BinaryVariantUtil {
+    public static final int BASIC_TYPE_BITS = 2;
+    public static final int BASIC_TYPE_MASK = 0x3;
+    public static final int TYPE_INFO_MASK = 0x3F;
+    // The inclusive maximum value of the type info value. It is the size 
limit of `SHORT_STR`.
+    public static final int MAX_SHORT_STR_SIZE = 0x3F;
+
+    // Below is all possible basic type values.
+    // Primitive value. The type info value must be one of the values in the 
below section.
+    public static final int PRIMITIVE = 0;
+    // Short string value. The type info value is the string size, which must 
be in `[0,
+    // kMaxShortStrSize]`.
+    // The string content bytes directly follow the header byte.
+    public static final int SHORT_STR = 1;
+    // Object value. The content contains a size, a list of field ids, a list 
of field offsets, and
+    // the actual field data. The length of the id list is `size`, while the 
length of the offset
+    // list is `size + 1`, where the last offset represent the total size of 
the field data. The
+    // fields in an object must be sorted by the field name in alphabetical 
order. Duplicate field
+    // names in one object are not allowed.
+    // We use 5 bits in the type info to specify the integer type of the 
object header: it should
+    // be 0_b4_b3b2_b1b0 (MSB is 0), where:
+    // - b4 specifies the type of size. When it is 0/1, `size` is a 
little-endian 1/4-byte
+    // unsigned integer.
+    // - b3b2/b1b0 specifies the integer type of id and offset. When the 2 
bits are  0/1/2, the
+    // list contains 1/2/3-byte little-endian unsigned integers.
+    public static final int OBJECT = 2;
+    // Array value. The content contains a size, a list of field offsets, and 
the actual element
+    // data. It is similar to an object without the id list. The length of the 
offset list
+    // is `size + 1`, where the last offset represent the total size of the 
element data.
+    // Its type info should be: 000_b2_b1b0:
+    // - b2 specifies the type of size.
+    // - b1b0 specifies the integer type of offset.
+    public static final int ARRAY = 3;
+
+    // Below is all possible type info values for `PRIMITIVE`.
+    // JSON Null value. Empty content.
+    public static final int NULL = 0;
+    // True value. Empty content.
+    public static final int TRUE = 1;
+    // False value. Empty content.
+    public static final int FALSE = 2;
+    // 1-byte little-endian signed integer.
+    public static final int INT1 = 3;
+    // 2-byte little-endian signed integer.
+    public static final int INT2 = 4;
+    // 4-byte little-endian signed integer.
+    public static final int INT4 = 5;
+    // 4-byte little-endian signed integer.
+    public static final int INT8 = 6;
+    // 8-byte IEEE double.
+    public static final int DOUBLE = 7;
+    // 4-byte decimal. Content is 1-byte scale + 4-byte little-endian signed 
integer.
+    public static final int DECIMAL4 = 8;
+    // 8-byte decimal. Content is 1-byte scale + 8-byte little-endian signed 
integer.
+    public static final int DECIMAL8 = 9;
+    // 16-byte decimal. Content is 1-byte scale + 16-byte little-endian signed 
integer.
+    public static final int DECIMAL16 = 10;
+    // Date value. Content is 4-byte little-endian signed integer that 
represents the number of days
+    // from the Unix epoch.
+    public static final int DATE = 11;
+    // Timestamp value. Content is 8-byte little-endian signed integer that 
represents the number of
+    // microseconds elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. It 
is displayed to users
+    // in
+    // their local time zones and may be displayed differently depending on 
the execution
+    // environment.
+    public static final int TIMESTAMP = 12;
+    // Timestamp_ntz value. It has the same content as `TIMESTAMP` but should 
always be interpreted
+    // as if the local time zone is UTC.
+    public static final int TIMESTAMP_NTZ = 13;
+    // 4-byte IEEE float.
+    public static final int FLOAT = 14;
+    // Binary value. The content is (4-byte little-endian unsigned integer 
representing the binary
+    // size) + (size bytes of binary content).
+    public static final int BINARY = 15;
+    // Long string value. The content is (4-byte little-endian unsigned 
integer representing the
+    // string size) + (size bytes of string content).
+    public static final int LONG_STR = 16;
+
+    public static final byte VERSION = 1;
+    // The lower 4 bits of the first metadata byte contain the version.
+    public static final byte VERSION_MASK = 0x0F;
+
+    public static final int U8_MAX = 0xFF;
+    public static final int U16_MAX = 0xFFFF;
+    public static final int U24_MAX = 0xFFFFFF;
+    public static final int U24_SIZE = 3;
+    public static final int U32_SIZE = 4;
+
+    // Both variant value and variant metadata need to be no longer than 16MiB.
+    public static final int SIZE_LIMIT = U24_MAX + 1;
+
+    public static final int MAX_DECIMAL4_PRECISION = 9;
+    public static final int MAX_DECIMAL8_PRECISION = 18;
+    public static final int MAX_DECIMAL16_PRECISION = 38;
+
+    public static final int BINARY_SEARCH_THRESHOLD = 32;
+
+    public static final DateTimeFormatter TIMESTAMP_FORMATTER =
+            new DateTimeFormatterBuilder()
+                    .append(DateTimeFormatter.ISO_LOCAL_DATE)
+                    .appendLiteral(' ')

Review Comment:
   See also 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#json-timestamp-format-standard.
   Or is the the intention to use SQL format, which is also fine for me. I just 
wanted to raise awareness here.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala:
##########
@@ -2119,6 +2119,45 @@ class AggregateITCase(
 
     tEnv.dropTemporarySystemFunction("PERCENTILE")
   }
+
+  @TestTemplate
+  def testVariantAggFunction(): Unit = {

Review Comment:
   Use the new SemanticTestBase like e.g. ProcessTableFunctionSemanticTests 
they make testing input and output results very easy.
   
   We shouldn't add more Scala code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to