FrankChen021 commented on code in PR #19534:
URL: https://github.com/apache/druid/pull/19534#discussion_r3334891669


##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergNativeRecordReader.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceFactory;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputStats;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputSourceReader} that reads one Iceberg data file and applies 
any associated
+ * position-delete and equality-delete files before converting records to 
Druid {@link InputRow}s.
+ *
+ * <p>The table schema is deserialized from a JSON string (captured on the 
coordinator at
+ * planning time) — no catalog interaction is needed for schema resolution on 
the worker.
+ *
+ * <p>File I/O is handled by {@link WarehouseFileIO} for local filesystem 
paths.  For non-local
+ * paths (S3, HDFS) a fallback is made to the catalog's own {@link FileIO} 
(Phase 1 limitation;
+ * see {@code WarehouseFileIO} Javadoc for details).
+ *
+ * <p>This reader is constructed by {@link IcebergFileTaskInputSource#reader} 
and handles the
+ * Iceberg V2 delete semantics in a straightforward way:
+ * <ol>
+ *   <li>Deserialize the table schema from the embedded JSON string.</li>
+ *   <li>Resolve the {@link FileIO} to use for this data file's path.</li>
+ *   <li>Load position delete files and build a set of row positions to 
skip.</li>
+ *   <li>Load equality delete files and build sets of field-value maps to 
skip.</li>
+ *   <li>Stream through the data file, filtering out any deleted rows.</li>
+ *   <li>Convert surviving {@link Record}s to {@link MapBasedInputRow} via
+ *       {@link IcebergRecordConverter}.</li>
+ * </ol>
+ *
+ * <p>Only Parquet format is supported for the initial implementation.
+ */
+public class IcebergNativeRecordReader implements InputSourceReader
+{
+  private static final Logger log = new 
Logger(IcebergNativeRecordReader.class);
+
+  private static final WarehouseFileIO WAREHOUSE_FILE_IO = new 
WarehouseFileIO();

Review Comment:
   [P1] Undefined FileIO class breaks compilation
   
   The new reader instantiates `WarehouseFileIO`, but no class with that name 
is defined in this package or elsewhere in the repo, and there is no import 
that could resolve it. Any build that compiles this extension will fail before 
the V2 ingestion path can run. Use an existing Iceberg `FileIO` implementation 
or add the missing implementation.



##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java:
##########
@@ -121,8 +150,7 @@ public List<String> extractSnapshotDataFiles(
         }
       }
 
-      // Handle residual filter based on mode
-      if (detectedResidual != null) {
+      if (detectedResidual == null) {

Review Comment:
   [P2] Residual-filter check is inverted
   
   This condition now enters the residual-handling branch when 
`detectedResidual` is null, even though null means no residual was found. With 
`ResidualFilterMode.FAIL`, valid scans that have no residual, such as 
partition-pruned scans, will throw a residual-filter error, while scans that do 
have a residual skip the warning/failure path. Restore the check to run only 
when `detectedResidual != null`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to