This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new aaf65b1aab8 Remove unnecessary string conversion in 
SanitizationTransformer (#17778)
aaf65b1aab8 is described below

commit aaf65b1aab8a2a642d9ff2c8b33e245b41ec0d06
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Feb 27 13:31:00 2026 -0800

    Remove unnecessary string conversion in SanitizationTransformer (#17778)
---
 .../recordtransformer/SanitizationTransformer.java | 49 +++++++----------
 .../recordtransformer/RecordTransformerTest.java   | 63 ++++++++++++----------
 2 files changed, 54 insertions(+), 58 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
index 640975a2e08..2c0abae691e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
@@ -31,25 +31,23 @@ import 
org.apache.pinot.spi.recordtransformer.RecordTransformer;
 import org.apache.pinot.spi.utils.StringUtil;
 
 
-/**
- * The {@code SanitizationTransformer} class will sanitize the values to 
follow certain rules including:
- * <ul>
- *   <li>No {@code null} characters in string values</li>
- *   <li>String values are within the length limit</li>
- * </ul>
- * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all 
values follow the data types in
- * {@link FieldSpec}.
- * This uses the MaxLengthExceedStrategy in the {@link FieldSpec} to decide 
what to do when the value exceeds the max.
- * For TRIM_LENGTH, the value is trimmed to the max length.
- * For SUBSTITUTE_DEFAULT_VALUE, the value is replaced with the default null 
value string.
- * For ERROR, an exception is thrown and the record is skipped.
- * For NO_ACTION, the value is kept as is if no NULL_CHARACTER present else 
trimmed till NULL.
- * In the first 2 scenarios, this metric REALTIME_ROWS_SANITIZED can be 
tracked to know if a trimmed /
- * default record was persisted.
- * In the third scenario, this metric ROWS_WITH_ERRORS can be tracked  to know 
if a record was skipped.
- * In the last scenario, this metric REALTIME_ROWS_SANITIZED can be tracked to 
know if a record was trimmed
- * due to having a null character.
- */
+/// The `SanitizationTransformer` class will sanitize the values to follow 
certain rules including:
+/// - No `null` characters in string values
+/// - String/bytes values are within the length limit
+///
+/// NOTE: should put this after the [DataTypeTransformer] so that all values 
follow the data types in [FieldSpec].
+/// This uses the `MaxLengthExceedStrategy` in the [FieldSpec] to decide what 
to do when the value exceeds the max
+/// length:
+/// - TRIM_LENGTH: Trim value to the max length
+/// - SUBSTITUTE_DEFAULT_VALUE: Replace value with the default null value
+/// - ERROR: Throw exception when value doesn't conform with the rules
+/// - NO_ACTION: Keep the value as is if no `NULL_CHARACTER` presents, else 
trim till `NULL_CHARACTER`
+///
+/// In the first 2 scenarios, this metric `REALTIME_ROWS_SANITIZED` can be 
tracked to know if a trimmed / default record
+/// was persisted.
+/// In the third scenario, this metric `ROWS_WITH_ERRORS` can be tracked to 
know if a record was skipped.
+/// In the last scenario, this metric `REALTIME_ROWS_SANITIZED` can be tracked 
to know if a record was trimmed due to
+/// having a `NULL_CHARACTER`.
 public class SanitizationTransformer implements RecordTransformer {
   private static final String NULL_CHARACTER = "\0";
   private final Map<String, SanitizedColumnInfo> _columnToColumnInfoMap = new 
HashMap<>();
@@ -62,8 +60,7 @@ public class SanitizationTransformer implements 
RecordTransformer {
           MaxLengthExceedStrategy strategy = 
fieldSpec.getEffectiveMaxLengthExceedStrategy();
           if (dataType == DataType.STRING || strategy != 
MaxLengthExceedStrategy.NO_ACTION) {
             _columnToColumnInfoMap.put(fieldSpec.getName(),
-                new SanitizedColumnInfo(fieldSpec.getName(), 
fieldSpec.getEffectiveMaxLength(), strategy,
-                    fieldSpec.getDefaultNullValue()));
+                new SanitizedColumnInfo(fieldSpec.getEffectiveMaxLength(), 
strategy, fieldSpec.getDefaultNullValue()));
           }
         }
       }
@@ -132,7 +129,7 @@ public class SanitizationTransformer implements 
RecordTransformer {
         case TRIM_LENGTH:
           return Pair.of(sanitizedValue, true);
         case SUBSTITUTE_DEFAULT_VALUE:
-          return 
Pair.of(FieldSpec.getStringValue(sanitizedColumnInfo.getDefaultNullValue()), 
true);
+          return Pair.of((String) sanitizedColumnInfo.getDefaultNullValue(), 
true);
         case ERROR:
           index = value.indexOf(NULL_CHARACTER);
           if (index < 0) {
@@ -190,23 +187,17 @@ public class SanitizationTransformer implements 
RecordTransformer {
   }
 
   private static class SanitizedColumnInfo {
-    private final String _columnName;
     private final int _maxLength;
     private final MaxLengthExceedStrategy _maxLengthExceedStrategy;
     private final Object _defaultNullValue;
 
-    private SanitizedColumnInfo(String columnName, int maxLength, 
MaxLengthExceedStrategy maxLengthExceedStrategy,
+    private SanitizedColumnInfo(int maxLength, MaxLengthExceedStrategy 
maxLengthExceedStrategy,
         Object defaultNullValue) {
-      _columnName = columnName;
       _maxLength = maxLength;
       _maxLengthExceedStrategy = maxLengthExceedStrategy;
       _defaultNullValue = defaultNullValue;
     }
 
-    public String getColumnName() {
-      return _columnName;
-    }
-
     public int getMaxLength() {
       return _maxLength;
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index a790883bce0..0cc0def4856 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -42,7 +42,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.recordtransformer.RecordTransformer;
-import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
@@ -129,6 +128,13 @@ public class RecordTransformerTest {
     return record;
   }
 
+  private static GenericRow getTypeConformingRecord() {
+    DataTypeTransformer dataTypeTransformer = new 
DataTypeTransformer(TABLE_CONFIG, SCHEMA);
+    GenericRow record = getRecord();
+    dataTypeTransformer.transform(record);
+    return record;
+  }
+
   @Test
   public void testFilterTransformer() {
     IngestionConfig ingestionConfig = new IngestionConfig();
@@ -255,7 +261,7 @@ public class RecordTransformerTest {
     ingestionConfig.setRowTimeValueCheck(true);
     tableConfig.setIngestionConfig(ingestionConfig);
     RecordTransformer transformerWithValidation = new 
TimeValidationTransformer(tableConfig, schema);
-    GenericRow record1 = getRecord();
+    GenericRow record1 = getTypeConformingRecord();
     record1.putValue(timeCol, 1L);
     for (int i = 0; i < NUM_ROUNDS; i++) {
       assertThrows(() -> transformerWithValidation.transform(record1));
@@ -264,7 +270,7 @@ public class RecordTransformerTest {
     // Invalid timestamp, validation enabled and ignoreErrors enabled
     ingestionConfig.setContinueOnError(true);
     transformer = new TimeValidationTransformer(tableConfig, schema);
-    GenericRow record2 = getRecord();
+    GenericRow record2 = getTypeConformingRecord();
     record2.putValue(timeCol, 1L);
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record2);
@@ -274,7 +280,7 @@ public class RecordTransformerTest {
     // Valid timestamp, validation enabled
     ingestionConfig.setContinueOnError(false);
     transformer = new TimeValidationTransformer(tableConfig, schema);
-    GenericRow record3 = getRecord();
+    GenericRow record3 = getTypeConformingRecord();
     Long currentTimeMillis = System.currentTimeMillis();
     record3.putValue(timeCol, currentTimeMillis);
     for (int i = 0; i < NUM_ROUNDS; i++) {
@@ -288,7 +294,7 @@ public class RecordTransformerTest {
     // scenario where string contains null and exceeds max length
     // and fieldSpec maxLengthExceedStrategy is default (TRIM_LENGTH)
     RecordTransformer transformer = new SanitizationTransformer(SCHEMA);
-    GenericRow record = getRecord();
+    GenericRow record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
       assertEquals(record.getValue("svStringWithNullCharacters"), "1");
@@ -309,7 +315,7 @@ public class RecordTransformerTest {
     FieldSpec svStringWithNullCharacters = 
schema.getFieldSpecFor("svStringWithNullCharacters");
     
svStringWithNullCharacters.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.ERROR);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       try {
         transformer.transform(record);
@@ -323,7 +329,7 @@ public class RecordTransformerTest {
     // scenario where string contains null and fieldSpec 
maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE
     
svStringWithNullCharacters.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
       assertEquals(record.getValue("svStringWithNullCharacters"), "null");
@@ -333,7 +339,7 @@ public class RecordTransformerTest {
     // scenario where string contains null and fieldSpec 
maxLengthExceedStrategy is to NO_ACTION
     
svStringWithNullCharacters.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.NO_ACTION);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
       assertEquals(record.getValue("svStringWithNullCharacters"), "1");
@@ -347,7 +353,7 @@ public class RecordTransformerTest {
     // scenario where string exceeds max length and fieldSpec 
maxLengthExceedStrategy is to ERROR
     
svStringWithLengthLimit.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.ERROR);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       try {
         transformer.transform(record);
@@ -361,7 +367,7 @@ public class RecordTransformerTest {
     // scenario where string exceeds max length and fieldSpec 
maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE
     
svStringWithLengthLimit.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
       assertEquals(record.getValue("svStringWithLengthLimit"), "null");
@@ -371,7 +377,7 @@ public class RecordTransformerTest {
     // scenario where string exceeds max length and fieldSpec 
maxLengthExceedStrategy is to NO_ACTION
     
svStringWithLengthLimit.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.NO_ACTION);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
       assertEquals(record.getValue("svStringWithLengthLimit"), "123");
@@ -386,27 +392,27 @@ public class RecordTransformerTest {
     svJson.setMaxLength(10);
     svJson.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.NO_ACTION);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
-      assertEquals(record.getValue("svJson"), "{\"first\": \"daffy\", 
\"last\": \"duck\"}");
+      assertEquals(record.getValue("svJson"), 
"{\"first\":\"daffy\",\"last\":\"duck\"}");
       assertFalse(record.isSanitized());
     }
 
     // scenario where json field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to TRIM_LENGTH
     svJson.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.TRIM_LENGTH);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
-      assertEquals(record.getValue("svJson"), "{\"first\": ");
+      assertEquals(record.getValue("svJson"), "{\"first\":\"");
       assertTrue(record.isSanitized());
     }
 
     // scenario where json field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE
     
svJson.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
       assertEquals(record.getValue("svJson"), "null");
@@ -416,14 +422,14 @@ public class RecordTransformerTest {
     // scenario where json field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to ERROR
     svJson.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.ERROR);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       try {
         transformer.transform(record);
         fail();
       } catch (IllegalStateException e) {
         assertEquals(e.getMessage(),
-            "Throwing exception as value: {\"first\": \"daffy\", \"last\": 
\"duck\"} for column svJson exceeds "
+            "Throwing exception as value: 
{\"first\":\"daffy\",\"last\":\"duck\"} for column svJson exceeds "
                 + "configured max length 10.");
       }
     }
@@ -433,23 +439,23 @@ public class RecordTransformerTest {
 
     // scenario where bytes field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to NO_ACTION
     FieldSpec svBytes = schema.getFieldSpecFor("svBytes");
-    svBytes.setMaxLength(2);
+    svBytes.setMaxLength(1);
     svBytes.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.NO_ACTION);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
-      assertEquals(record.getValue("svBytes"), "7b7b");
+      assertEquals(record.getValue("svBytes"), new byte[]{123, 123});
       assertFalse(record.isSanitized());
     }
 
     // scenario where bytes field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to TRIM_LENGTH
     svBytes.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.TRIM_LENGTH);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
-      assertEquals(record.getValue("svBytes"), "7b");
+      assertEquals(record.getValue("svBytes"), new byte[]{123});
       assertTrue(record.isSanitized());
     }
 
@@ -457,24 +463,23 @@ public class RecordTransformerTest {
     // SUBSTITUTE_DEFAULT_VALUE
     
svBytes.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
-      assertEquals(record.getValue("svBytes"), BytesUtils.toHexString(new 
byte[0]));
+      assertEquals(record.getValue("svBytes"), new byte[0]);
       assertTrue(record.isSanitized());
     }
 
     // scenario where bytes field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to ERROR
     svBytes.setMaxLengthExceedStrategy(MaxLengthExceedStrategy.ERROR);
     transformer = new SanitizationTransformer(schema);
-    record = getRecord();
+    record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       try {
         transformer.transform(record);
         fail();
       } catch (IllegalStateException e) {
-        assertEquals(e.getMessage(),
-            "Throwing exception as value: 7b7b for column svBytes exceeds 
configured max length 2.");
+        assertEquals(e.getMessage(), "Throwing exception as value for column 
svBytes exceeds configured max length 1.");
       }
     }
   }
@@ -482,7 +487,7 @@ public class RecordTransformerTest {
   @Test
   public void testSpecialValueTransformer() {
     RecordTransformer transformer = new SpecialValueTransformer(SCHEMA);
-    GenericRow record = getRecord();
+    GenericRow record = getTypeConformingRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       transformer.transform(record);
       assertEquals(Float.floatToRawIntBits((float) 
record.getValue("svFloatNegativeZero")),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to