klam-shop commented on code in PR #24482:
URL: https://github.com/apache/flink/pull/24482#discussion_r1551822003


##########
flink-formats/flink-protobuf-confluent-registry/src/test/java/org/apache/flink/formats/protobuf/registry/confluent/utils/FlinkToProtoSchemaConverterTest.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent.utils;
+
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.TestLoggerExtension;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkToProtoSchemaConverter}. */
+@ExtendWith(TestLoggerExtension.class)
+class FlinkToProtoSchemaConverterTest {

Review Comment:
   If we're relying on the root message to be first in the generated 
ProtoSchema, so we can set Empty Message Indices when serializing, can we test 
that? 



##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoRegistrySerializationSchema.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.protobuf.DynamicMessage;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data 
structure with a Protobuf
+ * format.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link
+ * ProtoRegistryDeserializationSchema}.
+ */
+public class ProtoRegistrySerializationSchema implements 
SerializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** RowType to generate the runtime converter. */
+    private final RowType rowType;
+
+    private final SchemaRegistryConfig schemaRegistryConfig;
+
+    /** The converter that converts internal data formats to JsonNode. */
+    private transient RowDataToProtoConverters.RowDataToProtoConverter 
runtimeConverter;
+
+    private transient SchemaRegistryCoder schemaCoder;
+    /** Output stream to write message to. */
+    private transient ByteArrayOutputStream arrayOutputStream;
+
+    public ProtoRegistrySerializationSchema(SchemaRegistryConfig 
registryConfig, RowType rowType) {
+        this.rowType = Preconditions.checkNotNull(rowType);
+        this.schemaRegistryConfig = Preconditions.checkNotNull(registryConfig);
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        final SchemaRegistryClient schemaRegistryClient = 
schemaRegistryConfig.createClient();
+        this.schemaCoder =
+                new SchemaRegistryCoder(schemaRegistryConfig.getSchemaId(), 
schemaRegistryClient);
+        final ProtobufSchema schema =
+                (ProtobufSchema)
+                        
schemaRegistryClient.getSchemaById(schemaRegistryConfig.getSchemaId());
+        this.runtimeConverter =
+                RowDataToProtoConverters.createConverter(rowType, 
schema.toDescriptor());
+        this.arrayOutputStream = new ByteArrayOutputStream();
+    }
+
+    @Override
+    public byte[] serialize(RowData row) {
+        try {
+            final DynamicMessage converted = (DynamicMessage) 
runtimeConverter.convert(row);
+
+            arrayOutputStream.reset();
+            schemaCoder.writeSchema(arrayOutputStream);
+            final ByteBuffer buffer = writeMessageIndexes();
+            arrayOutputStream.write(buffer.array());
+            converted.writeTo(arrayOutputStream);
+            return arrayOutputStream.toByteArray();
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(String.format("Could not serialize 
row '%s'.", row), t);
+        }
+    }
+
+    private static ByteBuffer writeMessageIndexes() {
+        //write empty message indices for now
+        ByteBuffer buffer = ByteBuffer.allocate(ByteUtils.sizeOfVarint(0));
+        ByteUtils.writeVarint(0, buffer);
+        return buffer;

Review Comment:
   Looks good from a quick look, thanks @anupamaggarwal !!



##########
flink-formats/flink-protobuf-confluent-registry/src/test/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoRegistrySerialisationDeserialisationTest.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.formats.protobuf.registry.confluent.utils.FlinkToProtoSchemaConverter;
+import 
org.apache.flink.formats.protobuf.registry.confluent.utils.MockInitializationContext;
+import 
org.apache.flink.formats.protobuf.registry.confluent.utils.ProtoToFlinkSchemaConverter;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.TestLoggerExtension;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import io.confluent.connect.protobuf.ProtobufConverter;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Smoke tests for checking {@link ProtoRegistrySerializationSchema} and {@link
+ * ProtoRegistryDeserializationSchema}.
+ *
+ * <p>For more thorough tests on converting different types see {@link 
RowDataToProtoConvertersTest}
+ * and/or {@link ProtoToRowDataConvertersTest}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public class ProtoRegistrySerialisationDeserialisationTest {

Review Comment:
   nitpick. rename file and class to fix spelling of Deserialization 
   ```suggestion
   public class ProtoRegistrySerialisationDeserializationTest {
   ```



##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/SchemaCoderProviders.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import 
org.apache.flink.formats.protobuf.registry.confluent.utils.FlinkToProtoSchemaConverter;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static java.lang.String.format;
+
+/** Factory for {@link 
org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder}. */
+public class SchemaCoderProviders {
+
+    /**
+     * Creates a {@link 
org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder} in cases
+     * where the schema has already been setup before-hand/exists in Confluent 
Schema Registry.
+     *
+     * <p>Useful in scenarios where users want to be more explicit with 
schemas used. In these cases
+     * the external schema specified through schemaId will take precedence for 
encoding/decoding
+     * data. Also, the step of registering with schemaRegistry during 
serialization, will be
+     * skipped.
+     *
+     * <p>A single Schema Registry Protobuf entry may contain multiple 
Protobuf messages, some of
+     * which may have nested messages. The messageName identifies the exact 
message/schema to use
+     * for serialization/deserialization. Consider the following protobuf 
message
+     *
+     * <pre>
+     * package test.package;
+     * message MessageA {
+     *     message MessageB {
+     *         message MessageC {
+     *         ...
+     *         }
+     *     }
+     *     message MessageD {
+     *     ...
+     *     }
+     *     message MessageE {
+     *         message MessageF {
+     *         ...
+     *         }
+     *         message MessageG {
+     *         ...
+     *         }
+     *     ...
+     *     }
+     * ...
+     * }
+     * </pre>
+     *
+     * <p>In order to use messageD the messageName should contain the value of
+     * test.package.messageD. Similarly, for messageF to be used messageName 
should contain
+     * test.package.MessageE.MessageF.
+     *
+     * @param schemaId SchemaId for external schema referenced for 
encoding/decoding of payload.
+     * @param messageName Optional message name to be used to select the right 
{@link
+     *     com.google.protobuf.Message} for Serialialization/Deserialization. 
In absence of
+     *     messageName the outermost message will be used.
+     * @param schemaRegistryClient client handle to Schema Registry {@link
+     *     io.confluent.kafka.schemaregistry.client.SchemaRegistryClient}
+     * @return
+     */
+    public static SchemaCoder createForPreRegisteredSchema(
+            int schemaId, @Nullable String messageName, SchemaRegistryClient 
schemaRegistryClient) {
+        return new PreRegisteredSchemaCoder(schemaId, messageName, 
schemaRegistryClient);
+    }
+
+    /**
+     * Creates a default schema coder.
+     *
+     * <p>For serialization schema coder will infer the schema from Flink 
{@link
+     * org.apache.flink.table.types.logical.RowType}. Schema obtained from 
rowType will also be
+     * registered to Schema Registry using the subject passed in by invoking 
{@link
+     * 
io.confluent.kafka.schemaregistry.client.SchemaRegistryClient#register(String,
+     * io.confluent.kafka.schemaregistry.ParsedSchema)}.
+     *
+     * <p>For deserialization schema coder will infer schema from InputStream. 
In cases where
+     * messageIndexes indicate using a nested schema, the appropriate nested 
schema will be used.
+     *
+     * @param subject Subject to use for registering schema (only required for 
serialization).
+     * @param rowType Flink Row type.
+     * @param schemaRegistryClient Client for SchemaRegistry
+     * @return SchemaCoder to use.
+     */
+    public static SchemaCoder createDefault(
+            String subject, RowType rowType, SchemaRegistryClient 
schemaRegistryClient) {
+        return new DefaultSchemaCoder(subject, rowType, schemaRegistryClient);
+    }
+
+    /**
+     * Default implementation of SchemaCoder.
+     *
+     * <p>Parses schema information from inputStream for de-serialization. For 
Serialization, uses
+     * Flink Row Type to infer schema and registers this schema with Schema 
Registry.
+     */
+    static class DefaultSchemaCoder extends SchemaCoder {
+        private static final String ROW = "row";
+        private static final String PACKAGE = "io.confluent.generated";
+        private static final List<Integer> DEFAULT_INDEX = 
Collections.singletonList(0);
+        /** Subject can be nullable in case coder is only used for 
deserialization. */
+        private @Nullable final String subject;
+
+        private final ProtobufSchema rowSchema;
+        private final SchemaRegistryClient schemaRegistryClient;
+
+        public DefaultSchemaCoder(
+                @Nullable String subject,
+                RowType rowType,
+                SchemaRegistryClient schemaRegistryClient) {
+            this.subject = subject;
+            rowSchema =
+                    FlinkToProtoSchemaConverter.fromFlinkRowType(
+                            Preconditions.checkNotNull(rowType), ROW, PACKAGE);
+            this.schemaRegistryClient = 
Preconditions.checkNotNull(schemaRegistryClient);
+        }
+
+        public static MessageIndexes readMessageIndex(DataInputStream input) 
throws IOException {
+
+            int size = ByteUtils.readVarint(input);
+            if (size == 0) {
+                return new MessageIndexes(DEFAULT_INDEX);
+            } else {
+                List<Integer> indexes = new ArrayList<>(size);
+
+                for (int i = 0; i < size; ++i) {
+                    indexes.add(ByteUtils.readVarint(input));
+                }
+                return new MessageIndexes(indexes);
+            }
+        }
+
+        @Override
+        public ProtobufSchema readSchema(InputStream in) throws IOException {
+            DataInputStream dataInputStream = new DataInputStream(in);
+
+            if (dataInputStream.readByte() != 0) {
+                throw new IOException("Unknown data format. Magic number does 
not match");
+            } else {
+                int schemaId = dataInputStream.readInt();
+                try {
+                    ProtobufSchema schema =
+                            (ProtobufSchema) 
schemaRegistryClient.getSchemaById(schemaId);
+                    MessageIndexes indexes = readMessageIndex(dataInputStream);
+                    String name = schema.toMessageName(indexes);
+                    schema = schema.copy(name);
+                    return schema;
+                } catch (RestClientException e) {
+                    throw new IOException(
+                            format("Could not find schema with id %s in 
registry", schemaId), e);
+                }
+            }
+        }
+
+        @Override
+        public ProtobufSchema writerSchema() {
+            return rowSchema;
+        }
+
+        @Override
+        public void writeSchema(OutputStream out) throws IOException {
+            out.write(CONFLUENT_MAGIC_BYTE);
+            int schemaId = 0;
+            try {
+                schemaId = schemaRegistryClient.register(subject, rowSchema);
+                writeInt(out, schemaId);
+                final ByteBuffer buffer = emptyMessageIndexes();

Review Comment:
   Might be worth adding a comment to clarify that. 
   
   Just note that this is brittle, if the FlinkToProtoSchema code changes such 
that the message order changes, this will break. 



##########
flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/SchemaCoderProviders.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.registry.confluent;
+
+import 
org.apache.flink.formats.protobuf.registry.confluent.utils.FlinkToProtoSchemaConverter;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static java.lang.String.format;
+
+/** Factory for {@link 
org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder}. */
+public class SchemaCoderProviders {
+
+    /**
+     * Creates a {@link 
org.apache.flink.formats.protobuf.registry.confluent.SchemaCoder} in cases
+     * where the schema has already been setup before-hand/exists in Confluent 
Schema Registry.
+     *
+     * <p>Useful in scenarios where users want to be more explicit with 
schemas used. In these cases
+     * the external schema specified through schemaId will take precedence for 
encoding/decoding
+     * data. Also, the step of registering with schemaRegistry during 
serialization, will be
+     * skipped.
+     *
+     * <p>A single Schema Registry Protobuf entry may contain multiple 
Protobuf messages, some of
+     * which may have nested messages. The messageName identifies the exact 
message/schema to use
+     * for serialization/deserialization. Consider the following protobuf 
message
+     *
+     * <pre>
+     * package test.package;
+     * message MessageA {
+     *     message MessageB {
+     *         message MessageC {
+     *         ...
+     *         }
+     *     }
+     *     message MessageD {
+     *     ...
+     *     }
+     *     message MessageE {
+     *         message MessageF {
+     *         ...
+     *         }
+     *         message MessageG {
+     *         ...
+     *         }
+     *     ...
+     *     }
+     * ...
+     * }
+     * </pre>
+     *
+     * <p>In order to use messageD the messageName should contain the value of
+     * test.package.messageD. Similarly, for messageF to be used messageName 
should contain
+     * test.package.MessageE.MessageF.
+     *
+     * @param schemaId SchemaId for external schema referenced for 
encoding/decoding of payload.
+     * @param messageName Optional message name to be used to select the right 
{@link
+     *     com.google.protobuf.Message} for Serialialization/Deserialization. 
In absence of
+     *     messageName the outermost message will be used.
+     * @param schemaRegistryClient client handle to Schema Registry {@link
+     *     io.confluent.kafka.schemaregistry.client.SchemaRegistryClient}
+     * @return
+     */
+    public static SchemaCoder createForPreRegisteredSchema(
+            int schemaId, @Nullable String messageName, SchemaRegistryClient 
schemaRegistryClient) {
+        return new PreRegisteredSchemaCoder(schemaId, messageName, 
schemaRegistryClient);
+    }
+
+    /**
+     * Creates a default schema coder.
+     *
+     * <p>For serialization schema coder will infer the schema from Flink 
{@link
+     * org.apache.flink.table.types.logical.RowType}. Schema obtained from 
rowType will also be
+     * registered to Schema Registry using the subject passed in by invoking 
{@link
+     * 
io.confluent.kafka.schemaregistry.client.SchemaRegistryClient#register(String,
+     * io.confluent.kafka.schemaregistry.ParsedSchema)}.
+     *
+     * <p>For deserialization schema coder will infer schema from InputStream. 
In cases where
+     * messageIndexes indicate using a nested schema, the appropriate nested 
schema will be used.
+     *
+     * @param subject Subject to use for registering schema (only required for 
serialization).
+     * @param rowType Flink Row type.
+     * @param schemaRegistryClient Client for SchemaRegistry
+     * @return SchemaCoder to use.
+     */
+    public static SchemaCoder createDefault(
+            String subject, RowType rowType, SchemaRegistryClient 
schemaRegistryClient) {
+        return new DefaultSchemaCoder(subject, rowType, schemaRegistryClient);
+    }
+
+    /**
+     * Default implementation of SchemaCoder.
+     *
+     * <p>Parses schema information from inputStream for de-serialization. For 
Serialization, uses
+     * Flink Row Type to infer schema and registers this schema with Schema 
Registry.
+     */
+    static class DefaultSchemaCoder extends SchemaCoder {
+        private static final String ROW = "row";
+        private static final String PACKAGE = "io.confluent.generated";
+        private static final List<Integer> DEFAULT_INDEX = 
Collections.singletonList(0);
+        /** Subject can be nullable in case coder is only used for 
deserialization. */
+        private @Nullable final String subject;
+
+        private final ProtobufSchema rowSchema;
+        private final SchemaRegistryClient schemaRegistryClient;
+
+        public DefaultSchemaCoder(
+                @Nullable String subject,
+                RowType rowType,
+                SchemaRegistryClient schemaRegistryClient) {
+            this.subject = subject;
+            rowSchema =
+                    FlinkToProtoSchemaConverter.fromFlinkRowType(
+                            Preconditions.checkNotNull(rowType), ROW, PACKAGE);
+            this.schemaRegistryClient = 
Preconditions.checkNotNull(schemaRegistryClient);
+        }
+
+        public static MessageIndexes readMessageIndex(DataInputStream input) 
throws IOException {
+
+            int size = ByteUtils.readVarint(input);
+            if (size == 0) {
+                return new MessageIndexes(DEFAULT_INDEX);
+            } else {
+                List<Integer> indexes = new ArrayList<>(size);
+
+                for (int i = 0; i < size; ++i) {
+                    indexes.add(ByteUtils.readVarint(input));
+                }
+                return new MessageIndexes(indexes);
+            }
+        }
+
+        @Override
+        public ProtobufSchema readSchema(InputStream in) throws IOException {
+            DataInputStream dataInputStream = new DataInputStream(in);
+
+            if (dataInputStream.readByte() != 0) {
+                throw new IOException("Unknown data format. Magic number does 
not match");
+            } else {
+                int schemaId = dataInputStream.readInt();
+                try {
+                    ProtobufSchema schema =
+                            (ProtobufSchema) 
schemaRegistryClient.getSchemaById(schemaId);
+                    MessageIndexes indexes = readMessageIndex(dataInputStream);
+                    String name = schema.toMessageName(indexes);
+                    schema = schema.copy(name);
+                    return schema;
+                } catch (RestClientException e) {
+                    throw new IOException(
+                            format("Could not find schema with id %s in 
registry", schemaId), e);
+                }
+            }
+        }
+
+        @Override
+        public ProtobufSchema writerSchema() {
+            return rowSchema;
+        }
+
+        @Override
+        public void writeSchema(OutputStream out) throws IOException {
+            out.write(CONFLUENT_MAGIC_BYTE);
+            int schemaId = 0;
+            try {
+                schemaId = schemaRegistryClient.register(subject, rowSchema);
+                writeInt(out, schemaId);
+                final ByteBuffer buffer = emptyMessageIndexes();

Review Comment:
   We can set empty message indices because the dynamically generated Protobuf 
Message from Flink RowData will have the root message first. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to