This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e0571294e2 Enable complexType handling in SegmentProcessFramework 
(#12942)
e0571294e2 is described below

commit e0571294e27a0b7243b8bfa0257f38665208bf45
Author: swaminathanmanish <[email protected]>
AuthorDate: Thu Apr 18 10:25:53 2024 -0700

    Enable complexType handling in SegmentProcessFramework (#12942)
    
    * Enable complexType handling in SegmentProcessFramework
---
 .../framework/SegmentProcessorFrameworkTest.java   | 48 ++++++++++++++++++++++
 .../recordtransformer/CompositeTransformer.java    |  4 ++
 2 files changed, 52 insertions(+)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index c2c4c51789..00631e778b 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -25,7 +25,9 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
@@ -43,6 +45,8 @@ import 
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
@@ -52,6 +56,7 @@ import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -73,6 +78,8 @@ public class SegmentProcessorFrameworkTest {
   private List<RecordReader> _singleSegment;
   private List<RecordReader> _multipleSegments;
   private List<RecordReader> _multiValueSegments;
+  private List<RecordReader> _recordReaderWithComplexType;
+
 
   private TableConfig _tableConfig;
   private TableConfig _tableConfigNullValueEnabled;
@@ -113,6 +120,8 @@ public class SegmentProcessorFrameworkTest {
 
     _schema =
         new 
Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("campaign",
 DataType.STRING, "")
+            .addSingleValueDimension("campaign.inner1", DataType.STRING)
+            .addSingleValueDimension("campaign.inner1.inner2", DataType.STRING)
             // NOTE: Intentionally put 1000 as default value to test skipping 
null values during rollup
             .addMetric("clicks", DataType.INT, 1000)
             .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:MILLISECONDS").build();
@@ -127,6 +136,7 @@ public class SegmentProcessorFrameworkTest {
     _multipleSegments = createInputSegments(new File(TEMP_DIR, 
"multiple_segments"), _rawData, 3, _schema);
     _multiValueSegments =
         createInputSegments(new File(TEMP_DIR, "multi_value_segment"), 
_rawDataMultiValue, 1, _schemaMV);
+    _recordReaderWithComplexType = createRecordReaderWithComplexType();
   }
 
   private List<RecordReader> createInputSegments(File inputDir, List<Object[]> 
rawData, int numSegments, Schema schema)
@@ -168,6 +178,22 @@ public class SegmentProcessorFrameworkTest {
     return segmentRecordReaders;
   }
 
+  private List<RecordReader> createRecordReaderWithComplexType() {
+    GenericRow genericRow = new GenericRow();
+    genericRow.putValue("a", 1L);
+    Map<String, Object> map1 = new HashMap<>();
+    genericRow.putValue("campaign", map1);
+    map1.put("inner", "innerv");
+    Map<String, Object> innerMap1 = new HashMap<>();
+    innerMap1.put("inner2", "inner2v");
+
+    map1.put("inner1", innerMap1);
+    Map<String, Object> map2 = new HashMap<>();
+    map2.put("c", 3);
+    genericRow.putValue("map2", map2);
+    return List.of(new GenericRowRecordReader(List.of(genericRow)));
+  }
+
   private GenericRow getGenericRow(Object[] rawRow) {
     GenericRow row = new GenericRow();
     row.putValue("campaign", rawRow[0]);
@@ -222,6 +248,28 @@ public class SegmentProcessorFrameworkTest {
     
assertEquals(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig(),
 true);
   }
 
+  @Test
+  public void testSegmentGenerationWithComplexType() throws Exception {
+    File workingDir = new File(TEMP_DIR, "single_segment_complex_type_output");
+    FileUtils.forceMkdir(workingDir);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setComplexTypeConfig(
+        new ComplexTypeConfig(null, ".", null, null));
+    _tableConfig.setIngestionConfig(ingestionConfig);
+    // Default configs
+    SegmentProcessorConfig config =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+    SegmentProcessorFramework framework =
+        new SegmentProcessorFramework(_recordReaderWithComplexType, config, 
workingDir);
+    List<File> outputSegments = framework.process();
+    ImmutableSegment segment = 
ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
+    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+    // Pick the column created from complex type
+    ColumnMetadata campaignMetadata = 
segmentMetadata.getColumnMetadataFor("campaign.inner1.inner2");
+    // Verify we see a specific value parsed from the complexType
+    Assert.assertEquals(campaignMetadata.getMinValue().compareTo("inner2v"), 
0);
+  }
+
   @Test
   public void testSingleSegment()
       throws Exception {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index a1bfcba52a..c32ef1dafa 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -104,6 +104,10 @@ public class CompositeTransformer implements 
RecordTransformer {
   public static CompositeTransformer 
composeAllTransformers(List<RecordTransformer> customTransformers,
       TableConfig tableConfig, Schema schema) {
     List<RecordTransformer> allTransformers = new 
ArrayList<>(customTransformers);
+    ComplexTypeTransformer complexTypeTransformer = 
ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
+    if (complexTypeTransformer != null) {
+      allTransformers.add(complexTypeTransformer);
+    }
     allTransformers.addAll(getDefaultTransformers(tableConfig, schema));
     return new CompositeTransformer(allTransformers);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to