dmariassy commented on code in PR #24482: URL: https://github.com/apache/flink/pull/24482#discussion_r1679790190
########## flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/utils/ProtoToFlinkSchemaConverter.java: ########## Review Comment: Is this class necessary? AFAICT it's only used in tests. I understand that it was adapted from the Kafka connect converter but I think the use case there is different from Flink's. The _connect_ converter needs to **dynamically** map connect types to proto (or vice versa), whereas in Flink, the target rowType is **statically defined** by the user using the SQL DDL. And even in tests, I think it would be more robust to use statically defined Flink schemas. ########## flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoRegistryDeserializationSchema.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf.registry.confluent; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.protobuf.registry.confluent.utils.MutableByteArrayInputStream; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +/** + * A {@link DeserializationSchema} that deserializes {@link RowData} from Protobuf messages using + * Schema Registry protocol. + */ +public class ProtoRegistryDeserializationSchema implements DeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + private final RowType rowType; + private final TypeInformation<RowData> producedType; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + private transient SchemaCoder schemaCoder; + + private transient Map<String, ProtoToRowDataConverters.ProtoToRowDataConverter> + runtimeConverterMap; + + public ProtoRegistryDeserializationSchema( + SchemaCoder schemaCoder, RowType rowType, TypeInformation<RowData> producedType) { + this.schemaCoder = schemaCoder; + this.rowType = Preconditions.checkNotNull(rowType); + this.producedType = Preconditions.checkNotNull(producedType); + runtimeConverterMap = new ConcurrentHashMap<>(); + } + + @Override + public void open(InitializationContext context) throws Exception { + schemaCoder.initialize(); + this.inputStream = new MutableByteArrayInputStream(); + } + + @Override + public RowData deserialize(byte[] message) throws IOException { + if (message == null) { + return null; + } + try { + inputStream.setBuffer(message); + ProtobufSchema schema = schemaCoder.readSchema(inputStream); + Descriptor descriptor = schema.toDescriptor(); + final DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, inputStream); + ProtoToRowDataConverters.ProtoToRowDataConverter converter = + runtimeConverterMap.computeIfAbsent( + descriptor.getName(), initializeConverterFn(descriptor)); Review Comment: I think the key should be `ProtobufSchema.version()` + `Descriptor.getName()` to prevent clashes between different versions of the same protobuf schema ########## flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoToRowDataConverters.java: ########## @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf.registry.confluent; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.flink.table.utils.DateTimeUtils; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.FieldDescriptor.Type; +import com.google.protobuf.Descriptors.OneofDescriptor; +import com.google.protobuf.Message; +import org.apache.commons.lang3.tuple.Pair; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.time.LocalDate; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Runtime converters between {@link com.google.protobuf.Message} and {@link + * org.apache.flink.table.data.RowData}. + */ +public class ProtoToRowDataConverters { + + private static final String KEY_FIELD = "key"; + private static final String VALUE_FIELD = "value"; + + /** Creates a runtime converter. */ + public static ProtoToRowDataConverter createConverter( + Descriptor readSchema, RowType targetType) { + if (readSchema.getRealOneofs().isEmpty()) { + return createNoOneOfRowConverter(readSchema, targetType); + } else { + return createOneOfRowConverter(readSchema, targetType); + } + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). + // -------------------------------------------------------------------------------- + + private static ProtoToRowDataConverter createOneOfRowConverter( + Descriptor readSchema, RowType targetType) { + final Map<String, OneofDescriptor> oneOfDescriptors = + readSchema.getRealOneofs().stream() + .collect(Collectors.toMap(OneofDescriptor::getName, Function.identity())); + final Map<String, FieldDescriptor> fieldDescriptors = + readSchema.getFields().stream() + .filter(fieldDescriptor -> fieldDescriptor.getRealContainingOneof() == null) + .collect(Collectors.toMap(FieldDescriptor::getName, Function.identity())); + + final int arity = targetType.getFieldCount(); + final List<OneOfDescriptorWithConverter> oneOfConverters = + targetType.getFields().stream() + .filter(field -> oneOfDescriptors.containsKey(field.getName())) + .map( + rowField -> { + final OneofDescriptor fieldDescriptor = + oneOfDescriptors.get(rowField.getName()); + return new OneOfDescriptorWithConverter( + fieldDescriptor, + createConverter( + fieldDescriptor, (RowType) rowField.getType())); + }) + .collect(Collectors.toList()); + final List<FieldDescriptorWithConverter> fieldConverters = + targetType.getFields().stream() + .filter(rowField -> !oneOfDescriptors.containsKey(rowField.getName())) + .map( + rowField -> { + final FieldDescriptor fieldDescriptor = + fieldDescriptors.get(rowField.getName()); + return new FieldDescriptorWithConverter( + fieldDescriptor, + createFieldConverter( + fieldDescriptor, rowField.getType())); + }) + .collect(Collectors.toList()); + return new ProtoToRowDataConverter() { + @Override + public Object convert(Object object) throws IOException { + final GenericRowData row = new GenericRowData(arity); + final Message message = (Message) object; + int i = 0; + for (OneOfDescriptorWithConverter descriptorWithConverter : oneOfConverters) { + final OneofDescriptor descriptor = descriptorWithConverter.descriptor; + final ProtoToRowDataConverter converter = descriptorWithConverter.converter; + if (message.hasOneof(descriptor)) { + row.setField(i, converter.convert(message)); + } + i++; + } + for (FieldDescriptorWithConverter descriptorWithConverter : fieldConverters) { + final FieldDescriptor fieldDescriptor = descriptorWithConverter.descriptor; + final ProtoToRowDataConverter converter = descriptorWithConverter.converter; + if (!fieldDescriptor.hasPresence() || message.hasField(fieldDescriptor)) { + row.setField(i, converter.convert(message.getField(fieldDescriptor))); + } + i++; + } Review Comment: Will this respect the ordering of the fields as defined in `RowType targetType`? ########## flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoToRowDataConverters.java: ########## @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf.registry.confluent; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.flink.table.utils.DateTimeUtils; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.FieldDescriptor.Type; +import com.google.protobuf.Descriptors.OneofDescriptor; +import com.google.protobuf.Message; +import org.apache.commons.lang3.tuple.Pair; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.time.LocalDate; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Runtime converters between {@link com.google.protobuf.Message} and {@link + * org.apache.flink.table.data.RowData}. + */ +public class ProtoToRowDataConverters { + + private static final String KEY_FIELD = "key"; + private static final String VALUE_FIELD = "value"; + + /** Creates a runtime converter. */ + public static ProtoToRowDataConverter createConverter( + Descriptor readSchema, RowType targetType) { + if (readSchema.getRealOneofs().isEmpty()) { + return createNoOneOfRowConverter(readSchema, targetType); + } else { + return createOneOfRowConverter(readSchema, targetType); + } + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). + // -------------------------------------------------------------------------------- + + private static ProtoToRowDataConverter createOneOfRowConverter( + Descriptor readSchema, RowType targetType) { + final Map<String, OneofDescriptor> oneOfDescriptors = + readSchema.getRealOneofs().stream() + .collect(Collectors.toMap(OneofDescriptor::getName, Function.identity())); + final Map<String, FieldDescriptor> fieldDescriptors = + readSchema.getFields().stream() + .filter(fieldDescriptor -> fieldDescriptor.getRealContainingOneof() == null) + .collect(Collectors.toMap(FieldDescriptor::getName, Function.identity())); + + final int arity = targetType.getFieldCount(); + final List<OneOfDescriptorWithConverter> oneOfConverters = + targetType.getFields().stream() + .filter(field -> oneOfDescriptors.containsKey(field.getName())) + .map( + rowField -> { + final OneofDescriptor fieldDescriptor = + oneOfDescriptors.get(rowField.getName()); + return new OneOfDescriptorWithConverter( + fieldDescriptor, + createConverter( + fieldDescriptor, (RowType) rowField.getType())); Review Comment: Why do we assume that `oneOf`s would always be represented as `ROW<>`s in Flink? ```proto message SampleMessage { oneof test_oneof { string name = 4; int32 age = 9; } } ``` could be represented as ```sql CREATE TABLE sample AS ( name VARCHAR, age INT ) ########## flink-formats/flink-protobuf-confluent-registry/src/main/java/org/apache/flink/formats/protobuf/registry/confluent/ProtoToRowDataConverters.java: ########## @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf.registry.confluent; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.flink.table.utils.DateTimeUtils; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.FieldDescriptor.Type; +import com.google.protobuf.Descriptors.OneofDescriptor; +import com.google.protobuf.Message; +import org.apache.commons.lang3.tuple.Pair; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.time.LocalDate; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Runtime converters between {@link com.google.protobuf.Message} and {@link + * org.apache.flink.table.data.RowData}. + */ +public class ProtoToRowDataConverters { + + private static final String KEY_FIELD = "key"; + private static final String VALUE_FIELD = "value"; + + /** Creates a runtime converter. */ + public static ProtoToRowDataConverter createConverter( + Descriptor readSchema, RowType targetType) { + if (readSchema.getRealOneofs().isEmpty()) { + return createNoOneOfRowConverter(readSchema, targetType); + } else { + return createOneOfRowConverter(readSchema, targetType); + } + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). + // -------------------------------------------------------------------------------- + + private static ProtoToRowDataConverter createOneOfRowConverter( + Descriptor readSchema, RowType targetType) { + final Map<String, OneofDescriptor> oneOfDescriptors = + readSchema.getRealOneofs().stream() + .collect(Collectors.toMap(OneofDescriptor::getName, Function.identity())); + final Map<String, FieldDescriptor> fieldDescriptors = + readSchema.getFields().stream() + .filter(fieldDescriptor -> fieldDescriptor.getRealContainingOneof() == null) + .collect(Collectors.toMap(FieldDescriptor::getName, Function.identity())); + + final int arity = targetType.getFieldCount(); + final List<OneOfDescriptorWithConverter> oneOfConverters = + targetType.getFields().stream() + .filter(field -> oneOfDescriptors.containsKey(field.getName())) + .map( + rowField -> { + final OneofDescriptor fieldDescriptor = + oneOfDescriptors.get(rowField.getName()); + return new OneOfDescriptorWithConverter( + fieldDescriptor, + createConverter( + fieldDescriptor, (RowType) rowField.getType())); Review Comment: There also doesn't seem to be a similar assumption in the "vanilla" flink-protobuf format as far as I can tell -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org