This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4a6826eebcf fix bugs with skipped rows in TransformPipeline (#17769)
4a6826eebcf is described below
commit 4a6826eebcf30556b4006136db219f6bb04e4315
Author: Johan Adami <[email protected]>
AuthorDate: Thu Feb 26 15:48:00 2026 -0500
fix bugs with skipped rows in TransformPipeline (#17769)
---
.../local/segment/creator/TransformPipeline.java | 4 +-
.../segment/creator/TransformPipelineTest.java | 79 ++++++++++++++++++++++
2 files changed, 81 insertions(+), 2 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
index 6b8d62be1a9..9976cb9d98b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
@@ -81,7 +81,7 @@ public class TransformPipeline {
public Result processRow(GenericRow decodedRow) {
if (Boolean.TRUE.equals(decodedRow.getValue(GenericRow.SKIP_RECORD_KEY))) {
- return new Result(List.of(), 0, 0, 0);
+ return new Result(List.of(), 0, 1, 0);
}
//noinspection unchecked
List<GenericRow> rows = (List<GenericRow>)
decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
@@ -110,7 +110,7 @@ public class TransformPipeline {
_numRowsSanitized++;
}
}
- return new Result(rows, skippedRowCount, incompleteRowCount,
sanitizedRowCount);
+ return new Result(rows, incompleteRowCount, skippedRowCount,
sanitizedRowCount);
}
/// Reports stats after all rows are processed.
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/TransformPipelineTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/TransformPipelineTest.java
index 847d94a6653..c32d03ca6c9 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/TransformPipelineTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/TransformPipelineTest.java
@@ -23,14 +23,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.FieldSpec.MaxLengthExceedStrategy;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
public class TransformPipelineTest {
@@ -92,6 +98,22 @@ public class TransformPipelineTest {
pipeline.processRow(multipleRow);
}
+ @Test
+ public void testSkipRecordRow()
+ throws Exception {
+ TableConfig config = createTestTableConfig();
+ Schema schema = Fixtures.createSchema();
+ TransformPipeline pipeline = new TransformPipeline(config, schema);
+ GenericRow skipRow = new GenericRow();
+ skipRow.putValue(GenericRow.SKIP_RECORD_KEY, true);
+ TransformPipeline.Result result = pipeline.processRow(skipRow);
+ assertNotNull(result);
+ assertTrue(result.getTransformedRows().isEmpty());
+ assertEquals(result.getSkippedRowCount(), 1);
+ assertEquals(result.getIncompleteRowCount(), 0);
+ assertEquals(result.getSanitizedRowCount(), 0);
+ }
+
@Test
public void testUnnestFieldWithTransform()
throws Exception {
@@ -786,4 +808,61 @@ public class TransformPipelineTest {
assertEquals(transformedRow.getValue("r.name"), "LimeVista/Tapes");
assertEquals(transformedRow.getValue("r.url"),
"https://api.github.com/repos/LimeVista/Tapes");
}
+
+ @Test
+ public void testSanitizedRowCount()
+ throws Exception {
+ Schema schema = new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .addSingleValueDimension("limitedStr", DataType.STRING, 2, "")
+ .addSingleValueDimension("intCol", DataType.INT)
+ .build();
+
schema.getFieldSpecFor("limitedStr").setMaxLengthExceedStrategy(MaxLengthExceedStrategy.TRIM_LENGTH);
+
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("testTable")
+ .build();
+
+ TransformPipeline pipeline = new TransformPipeline(tableConfig, schema);
+ GenericRow row = new GenericRow();
+ row.putValue("limitedStr", "abc"); // exceeds maxLength=2, triggers
sanitization
+ row.putValue("intCol", 42);
+
+ TransformPipeline.Result result = pipeline.processRow(row);
+ assertNotNull(result);
+ assertEquals(result.getTransformedRows().size(), 1);
+ assertEquals(result.getSanitizedRowCount(), 1);
+ assertEquals(result.getIncompleteRowCount(), 0);
+ assertEquals(result.getSkippedRowCount(), 0);
+ }
+
+ @Test
+ public void testIncompleteRowCount()
+ throws Exception {
+ Schema schema = new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .addSingleValueDimension("intCol", DataType.INT)
+ .addSingleValueDimension("strCol", DataType.STRING)
+ .build();
+
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setContinueOnError(true);
+
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("testTable")
+ .setIngestionConfig(ingestionConfig)
+ .build();
+
+ TransformPipeline pipeline = new TransformPipeline(tableConfig, schema);
+ GenericRow row = new GenericRow();
+ row.putValue("intCol", "NOT_A_NUMBER"); // invalid type triggers incomplete
+ row.putValue("strCol", "hello");
+
+ TransformPipeline.Result result = pipeline.processRow(row);
+ assertNotNull(result);
+ assertEquals(result.getTransformedRows().size(), 1);
+ assertEquals(result.getIncompleteRowCount(), 1);
+ assertEquals(result.getSanitizedRowCount(), 0);
+ assertEquals(result.getSkippedRowCount(), 0);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]