This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a6826eebcf fix bugs with skipped rows in TransformPipeline (#17769)
4a6826eebcf is described below

commit 4a6826eebcf30556b4006136db219f6bb04e4315
Author: Johan Adami <[email protected]>
AuthorDate: Thu Feb 26 15:48:00 2026 -0500

    fix bugs with skipped rows in TransformPipeline (#17769)
---
 .../local/segment/creator/TransformPipeline.java   |  4 +-
 .../segment/creator/TransformPipelineTest.java     | 79 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 2 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
index 6b8d62be1a9..9976cb9d98b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
@@ -81,7 +81,7 @@ public class TransformPipeline {
 
   public Result processRow(GenericRow decodedRow) {
     if (Boolean.TRUE.equals(decodedRow.getValue(GenericRow.SKIP_RECORD_KEY))) {
-      return new Result(List.of(), 0, 0, 0);
+      return new Result(List.of(), 0, 1, 0);
     }
     //noinspection unchecked
     List<GenericRow> rows = (List<GenericRow>) 
decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
@@ -110,7 +110,7 @@ public class TransformPipeline {
         _numRowsSanitized++;
       }
     }
-    return new Result(rows, skippedRowCount, incompleteRowCount, 
sanitizedRowCount);
+    return new Result(rows, incompleteRowCount, skippedRowCount, 
sanitizedRowCount);
   }
 
   /// Reports stats after all rows are processed.
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/TransformPipelineTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/TransformPipelineTest.java
index 847d94a6653..c32d03ca6c9 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/TransformPipelineTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/TransformPipelineTest.java
@@ -23,14 +23,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.FieldSpec.MaxLengthExceedStrategy;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 
 public class TransformPipelineTest {
@@ -92,6 +98,22 @@ public class TransformPipelineTest {
     pipeline.processRow(multipleRow);
   }
 
+  @Test
+  public void testSkipRecordRow()
+      throws Exception {
+    TableConfig config = createTestTableConfig();
+    Schema schema = Fixtures.createSchema();
+    TransformPipeline pipeline = new TransformPipeline(config, schema);
+    GenericRow skipRow = new GenericRow();
+    skipRow.putValue(GenericRow.SKIP_RECORD_KEY, true);
+    TransformPipeline.Result result = pipeline.processRow(skipRow);
+    assertNotNull(result);
+    assertTrue(result.getTransformedRows().isEmpty());
+    assertEquals(result.getSkippedRowCount(), 1);
+    assertEquals(result.getIncompleteRowCount(), 0);
+    assertEquals(result.getSanitizedRowCount(), 0);
+  }
+
   @Test
   public void testUnnestFieldWithTransform()
       throws Exception {
@@ -786,4 +808,61 @@ public class TransformPipelineTest {
     assertEquals(transformedRow.getValue("r.name"), "LimeVista/Tapes");
     assertEquals(transformedRow.getValue("r.url"), 
"https://api.github.com/repos/LimeVista/Tapes";);
   }
+
+  @Test
+  public void testSanitizedRowCount()
+      throws Exception {
+    Schema schema = new Schema.SchemaBuilder()
+        .setSchemaName("testTable")
+        .addSingleValueDimension("limitedStr", DataType.STRING, 2, "")
+        .addSingleValueDimension("intCol", DataType.INT)
+        .build();
+    
schema.getFieldSpecFor("limitedStr").setMaxLengthExceedStrategy(MaxLengthExceedStrategy.TRIM_LENGTH);
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("testTable")
+        .build();
+
+    TransformPipeline pipeline = new TransformPipeline(tableConfig, schema);
+    GenericRow row = new GenericRow();
+    row.putValue("limitedStr", "abc"); // exceeds maxLength=2, triggers 
sanitization
+    row.putValue("intCol", 42);
+
+    TransformPipeline.Result result = pipeline.processRow(row);
+    assertNotNull(result);
+    assertEquals(result.getTransformedRows().size(), 1);
+    assertEquals(result.getSanitizedRowCount(), 1);
+    assertEquals(result.getIncompleteRowCount(), 0);
+    assertEquals(result.getSkippedRowCount(), 0);
+  }
+
+  @Test
+  public void testIncompleteRowCount()
+      throws Exception {
+    Schema schema = new Schema.SchemaBuilder()
+        .setSchemaName("testTable")
+        .addSingleValueDimension("intCol", DataType.INT)
+        .addSingleValueDimension("strCol", DataType.STRING)
+        .build();
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setContinueOnError(true);
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("testTable")
+        .setIngestionConfig(ingestionConfig)
+        .build();
+
+    TransformPipeline pipeline = new TransformPipeline(tableConfig, schema);
+    GenericRow row = new GenericRow();
+    row.putValue("intCol", "NOT_A_NUMBER"); // invalid type triggers incomplete
+    row.putValue("strCol", "hello");
+
+    TransformPipeline.Result result = pipeline.processRow(row);
+    assertNotNull(result);
+    assertEquals(result.getTransformedRows().size(), 1);
+    assertEquals(result.getIncompleteRowCount(), 1);
+    assertEquals(result.getSanitizedRowCount(), 0);
+    assertEquals(result.getSkippedRowCount(), 0);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to