rdblue commented on code in PR #3212: URL: https://github.com/apache/parquet-java/pull/3212#discussion_r2085704162
########## parquet-variant/src/main/java/org/apache/parquet/variant/VariantElementConverter.java: ########## @@ -0,0 +1,668 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.variant; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +/** + * Converter for a shredded Variant containing a value and/or typed_value field: either a top-level + * Variant column, or a nested array element or object field. The top-level converter is handled + * by a subclass (VariantColumnConverter) that also reads metadata. + * + * Converters for the `value` and `typed_value` fields are implemented as nested classes in this + * class. + * + * All converters for a Variant column append their results to a VariantBuilder as values are read from Parquet. + * + * Values in `typed_value` are appended by the child converter. Values in `value` are stored by a + * child converter, but only appended when completing this group. Additionally, object fields are + * appended by the `typed_value` converter, but because residual values are stored in `value`, this + * converter is responsible for finalizing the object. + */ +class VariantElementConverter extends GroupConverter implements VariantConverter { + + // We need to remember the start position in order to tell if the child typed_value was non-null. + // Maybe we should just add a callback to each child's end()? + private int startWritePos; + private boolean typedValueIsObject = false; + private int valueIdx = -1; + private int typedValueIdx = -1; + protected VariantBuilderHolder holder; + protected Converter[] converters; + + // The following are only used if this is an object field. + private String objectFieldName = null; + private int objectFieldId = -1; + private VariantObjectConverter parent = null; + + // Only used if typedValueIsObject is true. + private Set<String> shreddedObjectKeys; + + @Override + public void init(VariantBuilderHolder holder) { + this.holder = holder; + for (Converter converter : converters) { + if (converter != null) { + ((VariantConverter) converter).init(holder); + } + } + } + + public VariantElementConverter(GroupType variantSchema, String objectFieldName, VariantObjectConverter parent) { + this(variantSchema); + this.objectFieldName = objectFieldName; + this.parent = parent; + } + + public VariantElementConverter(GroupType variantSchema) { + converters = new Converter[variantSchema.getFieldCount()]; + + List<Type> fields = variantSchema.getFields(); + + for (int i = 0; i < fields.size(); i++) { + Type field = fields.get(i); + String fieldName = field.getName(); + if (fieldName.equals("value")) { + this.valueIdx = i; + if (!field.isPrimitive() || field.asPrimitiveType().getPrimitiveTypeName() != BINARY) { + throw new IllegalArgumentException("Value must be a binary value"); + } + } else if (fieldName.equals("typed_value")) { + this.typedValueIdx = i; + } + } + + if (valueIdx >= 0) { + converters[valueIdx] = new VariantValueConverter(this); + } + + if (typedValueIdx >= 0) { + Converter typedConverter = null; + Type field = fields.get(typedValueIdx); + LogicalTypeAnnotation annotation = field.getLogicalTypeAnnotation(); + if (annotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { + typedConverter = new VariantArrayConverter(field.asGroupType()); + } else if (!field.isPrimitive()) { + GroupType typedValue = field.asGroupType(); + typedConverter = new VariantObjectConverter(typedValue); + typedValueIsObject = true; + shreddedObjectKeys = new HashSet<>(); + for (Type f : typedValue.getFields()) { + shreddedObjectKeys.add(f.getName()); + } + } else { + typedConverter = VariantScalarConverter.create(field.asPrimitiveType()); + } + + assert (typedConverter != null); + converters[typedValueIdx] = typedConverter; + } + } + + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + /** runtime calls **/ + @Override + public void start() { + if (objectFieldName != null) { + // Having a non-null element does not guarantee that we actually want to add this field, because + // if value and typed_value are both null, it's a missing field. In that case, we'll detect it + // in end() and reverse our decision to add this key. + ((VariantObjectBuilder) holder.builder).appendKey(objectFieldName); + } + if (valueIdx >= 0) { + ((VariantValueConverter) converters[valueIdx]).reset(); + } + startWritePos = holder.builder.getWritePos(); + } + + @Override + public void end() { + VariantBuilder builder = this.holder.builder; + + Binary variantValue = null; + VariantObjectBuilder objectBuilder = null; + if (typedValueIsObject) { + // Get the builder that the child typed_value has been adding its fields to. We need to possibly add + // more values from the `value` field, then finalize. If the value was not an object, fields will be null. + objectBuilder = ((VariantObjectConverter) converters[typedValueIdx]).getObjectBuilder(); + } + + // If objectBuilder is non-null, then we have a partially complete object ready to write. + // Otherwise, the typed_value converter should have written something if it was non-null. + boolean hasTypedValue = objectBuilder != null || (startWritePos != builder.getWritePos()); + + if (valueIdx >= 0) { + variantValue = ((VariantValueConverter) converters[valueIdx]).getValue(); + } + if (variantValue != null) { + if (!hasTypedValue) { + // Nothing else was added. We can directly append this value. + builder.shallowAppendVariant(variantValue.toByteBuffer()); + } else { + // Both value and typed_value were non-null. This is only valid for an object. + Variant value = + new Variant(variantValue.toByteBuffer(), this.holder.topLevelHolder.metadata.toByteBuffer()); + Variant.Type basicType = value.getType(); + if (hasTypedValue && basicType != Variant.Type.OBJECT) { + throw new IllegalArgumentException("Invalid variant, conflicting value and typed_value"); + } + + for (int i = 0; i < value.numObjectElements(); i++) { + Variant.ObjectField field = value.getFieldAtIndex(i); + if (shreddedObjectKeys.contains(field.key)) { + // Skip any field ID that is also in the typed schema. This check is needed because readers with + // pushdown may not look at the value column, causing inconsistent results if a writer puth a + // given key + // only in the value column when it was present in the typed_value schema. + // Alternatively, we could fail at this point, since the shredding is invalid according to the + // spec. + continue; + } + objectBuilder.appendKey(field.key); + objectBuilder.shallowAppendVariant(field.value.getValueRawBytes()); + } + builder.endObject(); + } + } else if (objectBuilder != null) { + // typed_value was an object, and there's nothing left to append. + builder.endObject(); + } else if (!hasTypedValue) { + // There was no value or typed_value. + if (objectFieldName != null) { + // Missing field. + ((VariantObjectBuilder) this.holder.builder).dropLastKey(); + } else { + // For arrays and top-level fields, the spec considers this invalid, but recommends using VariantNull. + builder.appendNull(); + } + } + } + + /** + * Converter for the metadata column. It sets the current metadata in the parent converter, + * so that it can be used by the typed_value converter on the same row. + */ + static class VariantMetadataConverter extends PrimitiveConverter implements VariantConverter { Review Comment: Why is this an inner class of `VariantElementConverter` when it doesn't extend `VariantElementConverter`? This is independent and seems like it should be completely separate. Also, why not create/extend a `BinaryConverter` interface to avoid duplicating the dictionary code? That would make sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
