andygrove commented on code in PR #1593:
URL: https://github.com/apache/datafusion-comet/pull/1593#discussion_r2040094303


##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -263,111 +272,129 @@ public void init() throws URISyntaxException, 
IOException {
                   sparkSchema.size(), requestedSchema.getColumns().size()));
         }
       }
-    } ////// End get requested schema
-
-    String timeZoneId = conf.get("spark.sql.session.timeZone");
-    // Native code uses "UTC" always as the timeZoneId when converting from 
spark to arrow schema.
-    Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC");
-    byte[] serializedRequestedArrowSchema = serializeArrowSchema(arrowSchema);
-    Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, "UTC");
-    byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema);
 
-    //// Create Column readers
-    List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    List<Type> fields = requestedSchema.getFields();
-    int numColumns = fields.size();
-    if (partitionSchema != null) numColumns += partitionSchema.size();
-    columnReaders = new AbstractColumnReader[numColumns];
-
-    // Initialize missing columns and use null vectors for them
-    missingColumns = new boolean[columns.size()];
-    List<String[]> paths = requestedSchema.getPaths();
-    StructField[] nonPartitionFields = sparkSchema.fields();
-    //    ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
-    for (int i = 0; i < requestedSchema.getFieldCount(); i++) {
-      Type t = requestedSchema.getFields().get(i);
-      //      Preconditions.checkState(
-      //          t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED),
-      //          "Complex type is not supported");
-      String[] colPath = paths.get(i);
-      if 
(nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME()))
 {
-        // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always 
populated with
-        // generated row indexes, rather than read from the file.
-        // TODO(SPARK-40059): Allow users to include columns named
-        //                    FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in 
their schemas.
-        // TODO: (ARROW NATIVE) Support row indices ...
-        //        long[] rowIndices = fileReader.getRowIndices();
-        //        columnReaders[i] = new 
RowIndexColumnReader(nonPartitionFields[i], capacity,
-        // rowIndices);
-        missingColumns[i] = true;
-      } else if (fileSchema.containsPath(colPath)) {
-        ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
-        if (!fd.equals(columns.get(i))) {
-          throw new UnsupportedOperationException("Schema evolution is not 
supported");
-        }
-        missingColumns[i] = false;
-      } else {
-        if (columns.get(i).getMaxDefinitionLevel() == 0) {
-          throw new IOException(
-              "Required column '"
-                  + Arrays.toString(colPath)
-                  + "' is missing"
-                  + " in data file "
-                  + filePath);
+      String timeZoneId = conf.get("spark.sql.session.timeZone");
+      // Native code uses "UTC" always as the timeZoneId when converting from 
spark to arrow schema.
+      Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC");
+      byte[] serializedRequestedArrowSchema = 
serializeArrowSchema(arrowSchema);
+      Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, "UTC");
+      byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema);
+
+      // Create Column readers
+      List<Type> fields = requestedSchema.getFields();
+      List<Type> fileFields = fileSchema.getFields();
+      int numColumns = fields.size();
+      if (partitionSchema != null) numColumns += partitionSchema.size();
+      columnReaders = new AbstractColumnReader[numColumns];
+
+      // Initialize missing columns and use null vectors for them
+      missingColumns = new boolean[numColumns];
+      StructField[] nonPartitionFields = sparkSchema.fields();
+      boolean hasRowIndexColumn = false;
+      // Ranges of rows to read (needed iff row indexes are being read)
+      List<BlockMetaData> blocks =
+          FileReader.filterRowGroups(readOptions, footer.getBlocks(), 
fileReader);
+      totalRowCount = fileReader.getFilteredRecordCount();
+      if (totalRowCount == 0) {
+        // all the data us filtered out.

Review Comment:
   ```suggestion
           // all the data is filtered out.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to