andygrove commented on code in PR #1593: URL: https://github.com/apache/datafusion-comet/pull/1593#discussion_r2040094303
########## common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java: ########## @@ -263,111 +272,129 @@ public void init() throws URISyntaxException, IOException { sparkSchema.size(), requestedSchema.getColumns().size())); } } - } ////// End get requested schema - - String timeZoneId = conf.get("spark.sql.session.timeZone"); - // Native code uses "UTC" always as the timeZoneId when converting from spark to arrow schema. - Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC"); - byte[] serializedRequestedArrowSchema = serializeArrowSchema(arrowSchema); - Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, "UTC"); - byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema); - //// Create Column readers - List<ColumnDescriptor> columns = requestedSchema.getColumns(); - List<Type> fields = requestedSchema.getFields(); - int numColumns = fields.size(); - if (partitionSchema != null) numColumns += partitionSchema.size(); - columnReaders = new AbstractColumnReader[numColumns]; - - // Initialize missing columns and use null vectors for them - missingColumns = new boolean[columns.size()]; - List<String[]> paths = requestedSchema.getPaths(); - StructField[] nonPartitionFields = sparkSchema.fields(); - // ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema); - for (int i = 0; i < requestedSchema.getFieldCount(); i++) { - Type t = requestedSchema.getFields().get(i); - // Preconditions.checkState( - // t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED), - // "Complex type is not supported"); - String[] colPath = paths.get(i); - if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) { - // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with - // generated row indexes, rather than read from the file. - // TODO(SPARK-40059): Allow users to include columns named - // FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in their schemas. - // TODO: (ARROW NATIVE) Support row indices ... - // long[] rowIndices = fileReader.getRowIndices(); - // columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], capacity, - // rowIndices); - missingColumns[i] = true; - } else if (fileSchema.containsPath(colPath)) { - ColumnDescriptor fd = fileSchema.getColumnDescription(colPath); - if (!fd.equals(columns.get(i))) { - throw new UnsupportedOperationException("Schema evolution is not supported"); - } - missingColumns[i] = false; - } else { - if (columns.get(i).getMaxDefinitionLevel() == 0) { - throw new IOException( - "Required column '" - + Arrays.toString(colPath) - + "' is missing" - + " in data file " - + filePath); + String timeZoneId = conf.get("spark.sql.session.timeZone"); + // Native code uses "UTC" always as the timeZoneId when converting from spark to arrow schema. + Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC"); + byte[] serializedRequestedArrowSchema = serializeArrowSchema(arrowSchema); + Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, "UTC"); + byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema); + + // Create Column readers + List<Type> fields = requestedSchema.getFields(); + List<Type> fileFields = fileSchema.getFields(); + int numColumns = fields.size(); + if (partitionSchema != null) numColumns += partitionSchema.size(); + columnReaders = new AbstractColumnReader[numColumns]; + + // Initialize missing columns and use null vectors for them + missingColumns = new boolean[numColumns]; + StructField[] nonPartitionFields = sparkSchema.fields(); + boolean hasRowIndexColumn = false; + // Ranges of rows to read (needed iff row indexes are being read) + List<BlockMetaData> blocks = + FileReader.filterRowGroups(readOptions, footer.getBlocks(), fileReader); + totalRowCount = fileReader.getFilteredRecordCount(); + if (totalRowCount == 0) { + // all the data us filtered out. Review Comment: ```suggestion // all the data is filtered out. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org