This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e0571294e2 Enable complexType handling in SegmentProcessFramework
(#12942)
e0571294e2 is described below
commit e0571294e27a0b7243b8bfa0257f38665208bf45
Author: swaminathanmanish <[email protected]>
AuthorDate: Thu Apr 18 10:25:53 2024 -0700
Enable complexType handling in SegmentProcessFramework (#12942)
* Enable complexType handling in SegmentProcessFramework
---
.../framework/SegmentProcessorFrameworkTest.java | 48 ++++++++++++++++++++++
.../recordtransformer/CompositeTransformer.java | 4 ++
2 files changed, 52 insertions(+)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index c2c4c51789..00631e778b 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -25,7 +25,9 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
@@ -43,6 +45,8 @@ import
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
@@ -52,6 +56,7 @@ import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -73,6 +78,8 @@ public class SegmentProcessorFrameworkTest {
private List<RecordReader> _singleSegment;
private List<RecordReader> _multipleSegments;
private List<RecordReader> _multiValueSegments;
+ private List<RecordReader> _recordReaderWithComplexType;
+
private TableConfig _tableConfig;
private TableConfig _tableConfigNullValueEnabled;
@@ -113,6 +120,8 @@ public class SegmentProcessorFrameworkTest {
_schema =
new
Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("campaign",
DataType.STRING, "")
+ .addSingleValueDimension("campaign.inner1", DataType.STRING)
+ .addSingleValueDimension("campaign.inner1.inner2", DataType.STRING)
// NOTE: Intentionally put 1000 as default value to test skipping
null values during rollup
.addMetric("clicks", DataType.INT, 1000)
.addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:MILLISECONDS").build();
@@ -127,6 +136,7 @@ public class SegmentProcessorFrameworkTest {
_multipleSegments = createInputSegments(new File(TEMP_DIR,
"multiple_segments"), _rawData, 3, _schema);
_multiValueSegments =
createInputSegments(new File(TEMP_DIR, "multi_value_segment"),
_rawDataMultiValue, 1, _schemaMV);
+ _recordReaderWithComplexType = createRecordReaderWithComplexType();
}
private List<RecordReader> createInputSegments(File inputDir, List<Object[]>
rawData, int numSegments, Schema schema)
@@ -168,6 +178,22 @@ public class SegmentProcessorFrameworkTest {
return segmentRecordReaders;
}
+ private List<RecordReader> createRecordReaderWithComplexType() {
+ GenericRow genericRow = new GenericRow();
+ genericRow.putValue("a", 1L);
+ Map<String, Object> map1 = new HashMap<>();
+ genericRow.putValue("campaign", map1);
+ map1.put("inner", "innerv");
+ Map<String, Object> innerMap1 = new HashMap<>();
+ innerMap1.put("inner2", "inner2v");
+
+ map1.put("inner1", innerMap1);
+ Map<String, Object> map2 = new HashMap<>();
+ map2.put("c", 3);
+ genericRow.putValue("map2", map2);
+ return List.of(new GenericRowRecordReader(List.of(genericRow)));
+ }
+
private GenericRow getGenericRow(Object[] rawRow) {
GenericRow row = new GenericRow();
row.putValue("campaign", rawRow[0]);
@@ -222,6 +248,28 @@ public class SegmentProcessorFrameworkTest {
assertEquals(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig(),
true);
}
+ @Test
+ public void testSegmentGenerationWithComplexType() throws Exception {
+ File workingDir = new File(TEMP_DIR, "single_segment_complex_type_output");
+ FileUtils.forceMkdir(workingDir);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setComplexTypeConfig(
+ new ComplexTypeConfig(null, ".", null, null));
+ _tableConfig.setIngestionConfig(ingestionConfig);
+ // Default configs
+ SegmentProcessorConfig config =
+ new
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+ SegmentProcessorFramework framework =
+ new SegmentProcessorFramework(_recordReaderWithComplexType, config,
workingDir);
+ List<File> outputSegments = framework.process();
+ ImmutableSegment segment =
ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
+ SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+ // Pick the column created from complex type
+ ColumnMetadata campaignMetadata =
segmentMetadata.getColumnMetadataFor("campaign.inner1.inner2");
+ // Verify we see a specific value parsed from the complexType
+ Assert.assertEquals(campaignMetadata.getMinValue().compareTo("inner2v"),
0);
+ }
+
@Test
public void testSingleSegment()
throws Exception {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index a1bfcba52a..c32ef1dafa 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -104,6 +104,10 @@ public class CompositeTransformer implements
RecordTransformer {
public static CompositeTransformer
composeAllTransformers(List<RecordTransformer> customTransformers,
TableConfig tableConfig, Schema schema) {
List<RecordTransformer> allTransformers = new
ArrayList<>(customTransformers);
+ ComplexTypeTransformer complexTypeTransformer =
ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
+ if (complexTypeTransformer != null) {
+ allTransformers.add(complexTypeTransformer);
+ }
allTransformers.addAll(getDefaultTransformers(tableConfig, schema));
return new CompositeTransformer(allTransformers);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]