alexeykudinkin commented on a change in pull request #5077:
URL: https://github.com/apache/hudi/pull/5077#discussion_r831528135
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -511,6 +512,132 @@ public static Object getNestedFieldVal(GenericRecord
record, String fieldName, b
}
}
+ /**
+ * Get schema for the given field and record. Field can be nested, denoted
by dot notation. e.g: a.b.c
+ *
+ * @param record - record containing the value of the given field
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record,
String fieldName) {
Review comment:
Do we really need this one if we already have method obtaining it from
schema? We can use that one just passing record's schema, can't we?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1012,11 +1015,12 @@ public static void accumulateColumnRanges(Schema.Field
field, String filePath,
Map<String,
HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
Map<String, Map<String, Object>>
columnToStats) {
Map<String, Object> columnStats = columnToStats.get(field.name());
+ field.schema().getType();
Review comment:
Can delete this one
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -511,6 +512,132 @@ public static Object getNestedFieldVal(GenericRecord
record, String fieldName, b
}
}
+ /**
+ * Get schema for the given field and record. Field can be nested, denoted
by dot notation. e.g: a.b.c
+ *
+ * @param record - record containing the value of the given field
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ GenericRecord valueNode = record;
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Object val = valueNode.get(part);
+
+ if (i == parts.length - 1) {
+ return resolveUnion(valueNode.getSchema().getField(part).schema());
+ } else {
+ if (!(val instanceof GenericRecord)) {
+ throw new HoodieException("Cannot find a record at part value :" +
part);
+ }
+ valueNode = (GenericRecord) val;
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+
+ /**
+ * Get schema for the given field and write schema. Field can be nested,
denoted by dot notation. e.g: a.b.c
+ * Use this method when record is not available. Otherwise, prefer to use
{@link #getNestedFieldSchemaFromRecord(GenericRecord, String)}
+ *
+ * @param writeSchema - write schema of the record
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Schema schema = writeSchema.getField(part).schema();
+
+ if (i == parts.length - 1) {
+ return resolveUnion(schema);
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+ /**
+ * Given a field schema, convert its value to native Java type.
+ *
+ * @param schema - field schema
+ * @param val - field value
+ * @return
+ */
+ public static Comparable<?> convertToNativeJavaType(Schema schema, Object
val) {
+ if (val == null) {
+ return StringUtils.EMPTY_STRING;
+ }
+ if (schema.getLogicalType() == LogicalTypes.date()) {
+ return java.sql.Date.valueOf((val.toString()));
+ }
+ switch (schema.getType()) {
+ case RECORD:
Review comment:
There's no sensible way we can compare records -- let's set min/max for
composite columns to null
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1042,22 +1046,27 @@ public static void aggregateColumnStats(IndexedRecord
record, Schema schema,
schema.getFields().forEach(field -> {
Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
- final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, consistentLogicalTimestampEnabled);
+ final Object fieldVal = getNestedFieldVal((GenericRecord) record,
field.name(), true, consistentLogicalTimestampEnabled);
+ final Schema fieldSchema =
getNestedFieldSchemaFromRecord((GenericRecord) record, field.name());
// update stats
- final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+ final int fieldSize = fieldVal == null ? 0 :
StringUtils.objToString(fieldVal).length();
Review comment:
For ex, Parquet does provide such metrics but these are also much more
sensible at Parquet level: encoding is fixed (Thrift), it also directly
translates into size on disk. In our case i don't think there's a sensible
high-level use-case for such metric
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -511,6 +512,132 @@ public static Object getNestedFieldVal(GenericRecord
record, String fieldName, b
}
}
+ /**
+ * Get schema for the given field and record. Field can be nested, denoted
by dot notation. e.g: a.b.c
+ *
+ * @param record - record containing the value of the given field
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ GenericRecord valueNode = record;
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Object val = valueNode.get(part);
+
+ if (i == parts.length - 1) {
+ return resolveUnion(valueNode.getSchema().getField(part).schema());
+ } else {
+ if (!(val instanceof GenericRecord)) {
+ throw new HoodieException("Cannot find a record at part value :" +
part);
+ }
+ valueNode = (GenericRecord) val;
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+
+ /**
+ * Get schema for the given field and write schema. Field can be nested,
denoted by dot notation. e.g: a.b.c
+ * Use this method when record is not available. Otherwise, prefer to use
{@link #getNestedFieldSchemaFromRecord(GenericRecord, String)}
+ *
+ * @param writeSchema - write schema of the record
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Schema schema = writeSchema.getField(part).schema();
+
+ if (i == parts.length - 1) {
+ return resolveUnion(schema);
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+ /**
+ * Given a field schema, convert its value to native Java type.
+ *
+ * @param schema - field schema
+ * @param val - field value
+ * @return
+ */
+ public static Comparable<?> convertToNativeJavaType(Schema schema, Object
val) {
+ if (val == null) {
+ return StringUtils.EMPTY_STRING;
+ }
+ if (schema.getLogicalType() == LogicalTypes.date()) {
+ return java.sql.Date.valueOf((val.toString()));
+ }
+ switch (schema.getType()) {
+ case RECORD:
Review comment:
Actually better approach would be to align with whatever Parquet is
doing for such columns
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1042,22 +1046,27 @@ public static void aggregateColumnStats(IndexedRecord
record, Schema schema,
schema.getFields().forEach(field -> {
Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
- final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, consistentLogicalTimestampEnabled);
+ final Object fieldVal = getNestedFieldVal((GenericRecord) record,
field.name(), true, consistentLogicalTimestampEnabled);
Review comment:
Also, since we rely on Parquet for collecting stats for base files we
need to make sure that this sequence is _compatible_ with it.
As such, we can't use comparator for Avro records and assume it would be
identical to that one of Parquet (Thrift) as it's very likely to be not: since
binary representation of this encodings is different we can't really compare
composite records like that (arrays, maps are all treated like binary in
Parquet, see ref at the bottom).
As such my proposal is to limit min/max stats only for primitive types.
Ref: Take a look at
[PrimitiveComparator](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java)
implementation
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1042,22 +1046,27 @@ public static void aggregateColumnStats(IndexedRecord
record, Schema schema,
schema.getFields().forEach(field -> {
Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
- final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, consistentLogicalTimestampEnabled);
+ final Object fieldVal = getNestedFieldVal((GenericRecord) record,
field.name(), true, consistentLogicalTimestampEnabled);
+ final Schema fieldSchema =
getNestedFieldSchemaFromRecord((GenericRecord) record, field.name());
// update stats
- final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+ final int fieldSize = fieldVal == null ? 0 :
StringUtils.objToString(fieldVal).length();
Review comment:
This is incorrect value-size calculation: string size != binary size
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -511,6 +512,132 @@ public static Object getNestedFieldVal(GenericRecord
record, String fieldName, b
}
}
+ /**
+ * Get schema for the given field and record. Field can be nested, denoted
by dot notation. e.g: a.b.c
+ *
+ * @param record - record containing the value of the given field
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ GenericRecord valueNode = record;
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Object val = valueNode.get(part);
+
+ if (i == parts.length - 1) {
+ return resolveUnion(valueNode.getSchema().getField(part).schema());
+ } else {
+ if (!(val instanceof GenericRecord)) {
+ throw new HoodieException("Cannot find a record at part value :" +
part);
+ }
+ valueNode = (GenericRecord) val;
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+
+ /**
+ * Get schema for the given field and write schema. Field can be nested,
denoted by dot notation. e.g: a.b.c
+ * Use this method when record is not available. Otherwise, prefer to use
{@link #getNestedFieldSchemaFromRecord(GenericRecord, String)}
+ *
+ * @param writeSchema - write schema of the record
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Schema schema = writeSchema.getField(part).schema();
+
+ if (i == parts.length - 1) {
+ return resolveUnion(schema);
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+ /**
+ * Given a field schema, convert its value to native Java type.
+ *
+ * @param schema - field schema
+ * @param val - field value
+ * @return
+ */
+ public static Comparable<?> convertToNativeJavaType(Schema schema, Object
val) {
Review comment:
I don't think we need this method. Instead we should make sure at the
place where we collect min/max stats that they're proper Java objects, and
simply treat min/max as `Comparable`
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1042,22 +1046,27 @@ public static void aggregateColumnStats(IndexedRecord
record, Schema schema,
schema.getFields().forEach(field -> {
Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
- final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, consistentLogicalTimestampEnabled);
+ final Object fieldVal = getNestedFieldVal((GenericRecord) record,
field.name(), true, consistentLogicalTimestampEnabled);
Review comment:
Few notes:
1. `getNestedFieldVal` is too heavy-weight of an operation, which we don't
actually need here (we don't need to parse the dot-path, etc). Instead we can
directly access the field value and perform logical type conversion by invoking
`convertValueForSpecificDataTypes` directly.
2. `fieldVal` we're getting should be of type `Comparable<?>`, otherwise we
set it to null (since there's no plausible order we can impose on these
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -511,6 +512,132 @@ public static Object getNestedFieldVal(GenericRecord
record, String fieldName, b
}
}
+ /**
+ * Get schema for the given field and record. Field can be nested, denoted
by dot notation. e.g: a.b.c
+ *
+ * @param record - record containing the value of the given field
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ GenericRecord valueNode = record;
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Object val = valueNode.get(part);
+
+ if (i == parts.length - 1) {
+ return resolveUnion(valueNode.getSchema().getField(part).schema());
+ } else {
+ if (!(val instanceof GenericRecord)) {
+ throw new HoodieException("Cannot find a record at part value :" +
part);
+ }
+ valueNode = (GenericRecord) val;
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+
+ /**
+ * Get schema for the given field and write schema. Field can be nested,
denoted by dot notation. e.g: a.b.c
+ * Use this method when record is not available. Otherwise, prefer to use
{@link #getNestedFieldSchemaFromRecord(GenericRecord, String)}
+ *
+ * @param writeSchema - write schema of the record
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Schema schema = writeSchema.getField(part).schema();
+
+ if (i == parts.length - 1) {
+ return resolveUnion(schema);
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+ /**
+ * Given a field schema, convert its value to native Java type.
+ *
+ * @param schema - field schema
+ * @param val - field value
+ * @return
+ */
+ public static Comparable<?> convertToNativeJavaType(Schema schema, Object
val) {
Review comment:
Please check other comments for context
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -511,6 +512,132 @@ public static Object getNestedFieldVal(GenericRecord
record, String fieldName, b
}
}
+ /**
+ * Get schema for the given field and record. Field can be nested, denoted
by dot notation. e.g: a.b.c
+ *
+ * @param record - record containing the value of the given field
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ GenericRecord valueNode = record;
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Object val = valueNode.get(part);
+
+ if (i == parts.length - 1) {
+ return resolveUnion(valueNode.getSchema().getField(part).schema());
+ } else {
+ if (!(val instanceof GenericRecord)) {
+ throw new HoodieException("Cannot find a record at part value :" +
part);
+ }
+ valueNode = (GenericRecord) val;
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+
+ /**
+ * Get schema for the given field and write schema. Field can be nested,
denoted by dot notation. e.g: a.b.c
+ * Use this method when record is not available. Otherwise, prefer to use
{@link #getNestedFieldSchemaFromRecord(GenericRecord, String)}
+ *
+ * @param writeSchema - write schema of the record
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema,
String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Schema schema = writeSchema.getField(part).schema();
+
+ if (i == parts.length - 1) {
+ return resolveUnion(schema);
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: "
+ fieldName);
+ }
+
+ /**
+ * Given a field schema, convert its value to native Java type.
+ *
+ * @param schema - field schema
+ * @param val - field value
+ * @return
+ */
+ public static Comparable<?> convertToNativeJavaType(Schema schema, Object
val) {
+ if (val == null) {
+ return StringUtils.EMPTY_STRING;
+ }
+ if (schema.getLogicalType() == LogicalTypes.date()) {
+ return java.sql.Date.valueOf((val.toString()));
+ }
+ switch (schema.getType()) {
+ case RECORD:
+ final List<Schema.Field> schemaFields = schema.getFields();
+ final GenericRecord recordVal = (GenericRecord) val;
+ for (Schema.Field f : schemaFields) {
+ return convertToNativeJavaType(f.schema(), recordVal.get(f.name()));
+ }
+ case ARRAY:
+ Schema elementSchema = schema.getElementType();
+ List<Object> listRes = new ArrayList<>();
+ for (Object v : (List) val) {
+ listRes.add(convertToNativeJavaType(elementSchema, v));
+ }
+ return listRes.toString();
+ case UNION:
+ return convertToNativeJavaType(resolveUnion(schema), val);
+ case STRING:
+ return val.toString();
+ case BYTES:
+ return (ByteBuffer) val;
+ case INT:
+ return (Integer) val;
+ case LONG:
+ return (Long) val;
+ case FLOAT:
+ return (Float) val;
+ case DOUBLE:
+ return (Double) val;
+ case BOOLEAN:
+ return (Boolean) val;
+ case ENUM:
+ case MAP:
+ case FIXED:
+ case NULL:
+ // TODO: implement for above types based on logical types
+ return null;
+ default:
+ throw new IllegalStateException("Unexpected value: " +
schema.getType());
+ }
+ }
+
+ /**
+ * Type-aware object comparison. Used to compare two objects for an Avro
field.
+ */
+ public static int compare(Object o1, Object o2, Schema schema) {
+ if (Schema.Type.MAP.equals(schema.getType())) {
+ return ((Map) o1).equals(o2) ? 0 : 1;
+ }
+ return GenericData.get().compare(o1, o2, schema);
+ }
+
+ private static Schema resolveUnion(Schema fieldSchema) {
Review comment:
There's already such utility (in this file actually)
`resolveNullableSchema`
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1042,22 +1046,27 @@ public static void aggregateColumnStats(IndexedRecord
record, Schema schema,
schema.getFields().forEach(field -> {
Map<String, Object> columnStats =
columnToStats.getOrDefault(field.name(), new HashMap<>());
- final String fieldVal = getNestedFieldValAsString((GenericRecord)
record, field.name(), true, consistentLogicalTimestampEnabled);
+ final Object fieldVal = getNestedFieldVal((GenericRecord) record,
field.name(), true, consistentLogicalTimestampEnabled);
+ final Schema fieldSchema =
getNestedFieldSchemaFromRecord((GenericRecord) record, field.name());
// update stats
- final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+ final int fieldSize = fieldVal == null ? 0 :
StringUtils.objToString(fieldVal).length();
Review comment:
I actually don't think this metric is useful at all without fixing the
encoding we use for the payload. Is the encoding here Avro? If it's Avro, then
the question is ...why Avro?
I'd suggest before we try to implement some of these metrics let's start
with use-case we have in mind for them and then take it up from there
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]