This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d91e5db76c Add SchemaConformingTransformer to transform records with
varying keys to fit a table's schema without dropping fields. (#11210)
d91e5db76c is described below
commit d91e5db76c53af9774db1c57c71520f4c196c059
Author: kirkrodrigues <[email protected]>
AuthorDate: Thu Aug 24 17:08:55 2023 -0400
Add SchemaConformingTransformer to transform records with varying keys to
fit a table's schema without dropping fields. (#11210)
* Add JsonLogTransformer to transform semi-structured log events to fit a
table's schema without dropping fields.
* Minor fix
* Add unit tests for JsonLogTransformer.
* Minor fixes.
* Refactor docstrings.
* Rename JsonLogTransformer to SchemaConformingTransformer
* JsonLogTransformer: Pass through GenericRow's special keys
---
.../apache/pinot/queries/TransformQueriesTest.java | 2 +-
.../recordtransformer/CompositeTransformer.java | 13 +-
.../SchemaConformingTransformer.java | 545 +++++++++++++++
.../pinot/segment/local/utils/IngestionUtils.java | 7 +
.../segment/local/utils/TableConfigUtils.java | 8 +
.../SchemaConformingTransformerTest.java | 731 +++++++++++++++++++++
.../config/table/ingestion/IngestionConfig.java | 15 +
.../SchemaConformingTransformerConfig.java | 78 +++
.../apache/pinot/spi/data/readers/GenericRow.java | 7 +
.../pinot/spi/stream/StreamDataDecoderImpl.java | 7 +
10 files changed, 1408 insertions(+), 5 deletions(-)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index f3abbbc534..572080e44e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -135,7 +135,7 @@ public class TransformQueriesTest extends BaseQueriesTest {
.setIngestionConfig(new IngestionConfig(null, null, null,
Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 ==
null || "
+ "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3
}, INT_COL1, INT_COL1_V3)")),
- null, null))
+ null, null, null))
.build();
Schema schema =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index e584944bda..dbc6cdbe3c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -48,7 +48,12 @@ public class CompositeTransformer implements
RecordTransformer {
* destination columns
* </li>
* <li>
- * {@link DataTypeTransformer} after {@link FilterTransformer} to
convert values to comply with the schema
+ * Optional {@link SchemaConformingTransformer} after {@link
FilterTransformer}, so that we can transform input
+ * records that have varying fields to a fixed schema without dropping
any fields
+ * </li>
+ * <li>
+ * {@link DataTypeTransformer} after {@link SchemaConformingTransformer}
to convert values to comply with the
+ * schema
* </li>
* <li>
* Optional {@link TimeValidationTransformer} after {@link
DataTypeTransformer} so that time value is converted to
@@ -67,9 +72,9 @@ public class CompositeTransformer implements
RecordTransformer {
public static CompositeTransformer getDefaultTransformer(TableConfig
tableConfig, Schema schema) {
return new CompositeTransformer(
Stream.of(new ExpressionTransformer(tableConfig, schema), new
FilterTransformer(tableConfig),
- new DataTypeTransformer(tableConfig, schema), new
TimeValidationTransformer(tableConfig, schema),
- new NullValueTransformer(tableConfig, schema), new
SanitizationTransformer(schema))
- .filter(t -> !t.isNoOp()).collect(Collectors.toList()));
+ new SchemaConformingTransformer(tableConfig, schema), new
DataTypeTransformer(tableConfig, schema),
+ new TimeValidationTransformer(tableConfig, schema), new
NullValueTransformer(tableConfig, schema),
+ new SanitizationTransformer(schema)).filter(t ->
!t.isNoOp()).collect(Collectors.toList()));
}
/**
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
new file mode 100644
index 0000000000..d8c7da6297
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
@@ -0,0 +1,545 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.TableConfig;
+import
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This transformer transforms records with varying keys such that they can be
stored in a table with a fixed schema.
+ * Since these records have varying keys, it is impractical to store each
field in its own table column. At the same
+ * time, most (if not all) fields may be important to the user, so we should
not drop any field unnecessarily. So this
+ * transformer primarily takes record-fields that don't exist in the schema
and stores them in a type of catchall field.
+ * <p>
+ * For example, consider this record:
+ * <pre>
+ * {
+ * "timestamp": 1687786535928,
+ * "hostname": "host1",
+ * "HOSTNAME": "host1",
+ * "level": "INFO",
+ * "message": "Started processing job1",
+ * "tags": {
+ * "platform": "data",
+ * "service": "serializer",
+ * "params": {
+ * "queueLength": 5,
+ * "timeout": 299,
+ * "userData_noIndex": {
+ * "nth": 99
+ * }
+ * }
+ * }
+ * }
+ * </pre>
+ * And let's say the table's schema contains these fields:
+ * <ul>
+ * <li>timestamp</li>
+ * <li>hostname</li>
+ * <li>level</li>
+ * <li>message</li>
+ * <li>tags.platform</li>
+ * <li>tags.service</li>
+ * <li>indexableExtras</li>
+ * <li>unindexableExtras</li>
+ * </ul>
+ * <p>
+ * Without this transformer, the entire "tags" field would be dropped when
storing the record in the table. However,
+ * with this transformer, the record would be transformed into the following:
+ * <pre>
+ * {
+ * "timestamp": 1687786535928,
+ * "hostname": "host1",
+ * "level": "INFO",
+ * "message": "Started processing job1",
+ * "tags.platform": "data",
+ * "tags.service": "serializer",
+ * "indexableExtras": {
+ * "tags": {
+ * "params": {
+ * "queueLength": 5,
+ * "timeout": 299
+ * }
+ * }
+ * },
+ * "unindexableExtras": {
+ * "tags": {
+ * "userData_noIndex": {
+ * "nth": 99
+ * }
+ * }
+ * }
+ * }
+ * </pre>
+ * Notice that the transformer:
+ * <ul>
+ * <li>Flattens nested fields which exist in the schema, like
"tags.platform"</li>
+ * <li>Drops some fields like "HOSTNAME", where "HOSTNAME" must be listed as
a field in the config option
+ * "fieldPathsToDrop".</li>
+ * <li>Moves fields which don't exist in the schema and have the suffix
"_noIndex" into the "unindexableExtras" field
+ * (the field name is configurable)</li>
+ * <li>Moves any remaining fields which don't exist in the schema into the
"indexableExtras" field (the field name is
+ * configurable)</li>
+ * </ul>
+ * <p>
+ * The "unindexableExtras" field allows the transformer to separate fields
which don't need indexing (because they are
+ * only retrieved, not searched) from those that do. The transformer also has
other configuration options specified in
+ * {@link SchemaConformingTransformerConfig}.
+ */
+public class SchemaConformingTransformer implements RecordTransformer {
+ private static final Logger _logger =
LoggerFactory.getLogger(SchemaConformingTransformer.class);
+
+ private final boolean _continueOnError;
+ private final SchemaConformingTransformerConfig _transformerConfig;
+ private final DataType _indexableExtrasFieldType;
+ private final DataType _unindexableExtrasFieldType;
+
+ private Map<String, Object> _schemaTree;
+
+ /**
+ * Validates the schema against the given transformer's configuration.
+ */
+ public static void validateSchema(@Nonnull Schema schema,
+ @Nonnull SchemaConformingTransformerConfig transformerConfig) {
+ validateSchemaFieldNames(schema.getPhysicalColumnNames(),
transformerConfig);
+
+ String indexableExtrasFieldName =
transformerConfig.getIndexableExtrasField();
+ getAndValidateExtrasFieldType(schema, indexableExtrasFieldName);
+ String unindexableExtrasFieldName =
transformerConfig.getUnindexableExtrasField();
+ if (null != unindexableExtrasFieldName) {
+ getAndValidateExtrasFieldType(schema, indexableExtrasFieldName);
+ }
+
+ validateSchemaAndCreateTree(schema);
+ }
+
+ /**
+ * Validates that none of the schema fields have names that conflict with
the transformer's configuration.
+ */
+ private static void validateSchemaFieldNames(Set<String> schemaFields,
+ SchemaConformingTransformerConfig transformerConfig) {
+ // Validate that none of the columns in the schema end with
unindexableFieldSuffix
+ String unindexableFieldSuffix =
transformerConfig.getUnindexableFieldSuffix();
+ if (null != unindexableFieldSuffix) {
+ for (String field : schemaFields) {
+ Preconditions.checkState(!field.endsWith(unindexableFieldSuffix),
"Field '%s' has no-index suffix '%s'", field,
+ unindexableFieldSuffix);
+ }
+ }
+
+ // Validate that none of the columns in the schema end overlap with the
fields in fieldPathsToDrop
+ Set<String> fieldPathsToDrop = transformerConfig.getFieldPathsToDrop();
+ if (null != fieldPathsToDrop) {
+ Set<String> fieldIntersection = new HashSet<>(schemaFields);
+ fieldIntersection.retainAll(fieldPathsToDrop);
+ Preconditions.checkState(fieldIntersection.isEmpty(), "Fields in schema
overlap with fieldPathsToDrop");
+ }
+ }
+
+ /**
+ * @return The field type for the given extras field
+ */
+ private static DataType getAndValidateExtrasFieldType(Schema schema,
@Nonnull String extrasFieldName) {
+ FieldSpec fieldSpec = schema.getFieldSpecFor(extrasFieldName);
+ Preconditions.checkState(null != fieldSpec, "Field '%s' doesn't exist in
schema", extrasFieldName);
+ DataType fieldDataType = fieldSpec.getDataType();
+ Preconditions.checkState(DataType.JSON == fieldDataType || DataType.STRING
== fieldDataType,
+ "Field '%s' has unsupported type %s", fieldDataType.toString());
+ return fieldDataType;
+ }
+
+ /**
+ * Validates the schema with a SchemaConformingTransformerConfig instance
and creates a tree representing the fields
+ * in the schema to be used when transforming input records. For instance,
the field "a.b" in the schema would be
+ * un-flattened into "{a: b: null}" in the tree, allowing us to more easily
process records containing the latter.
+ * @throws IllegalArgumentException if schema validation fails in one of two
ways:
+ * <ul>
+ * <li>One of the fields in the schema has a name which when interpreted
as a JSON path, corresponds to an object
+ * with an empty sub-key. E.g., the field name "a..b" corresponds to the
JSON {"a": {"": {"b": ...}}}</li>
+ * <li>Two fields in the schema have names which correspond to JSON paths
where one is a child of the other. E.g.,
+ * the field names "a.b" and "a.b.c" are considered invalid since "a.b.c"
is a child of "a.b".</li>
+ * </ul>
+ */
+ private static Map<String, Object> validateSchemaAndCreateTree(@Nonnull
Schema schema)
+ throws IllegalArgumentException {
+ Set<String> schemaFields = schema.getPhysicalColumnNames();
+
+ Map<String, Object> schemaTree = new HashMap<>();
+ List<String> subKeys = new ArrayList<>();
+ for (String field : schemaFields) {
+ int keySeparatorIdx = field.indexOf(JsonUtils.KEY_SEPARATOR);
+ if (-1 == keySeparatorIdx) {
+ // Not a flattened key
+ schemaTree.put(field, null);
+ continue;
+ }
+
+ subKeys.clear();
+ getAndValidateSubKeys(field, keySeparatorIdx, subKeys);
+
+ // Add all sub-keys except the leaf to the tree
+ Map<String, Object> currentNode = schemaTree;
+ for (int i = 0; i < subKeys.size() - 1; i++) {
+ String subKey = subKeys.get(i);
+
+ Map<String, Object> childNode;
+ if (currentNode.containsKey(subKey)) {
+ childNode = (Map<String, Object>) currentNode.get(subKey);
+ if (null == childNode) {
+ throw new IllegalArgumentException(
+ "Cannot handle field '" + String.join(JsonUtils.KEY_SEPARATOR,
subKeys.subList(0, i + 1))
+ + "' which overlaps with another field in the schema.");
+ }
+ } else {
+ childNode = new HashMap<>();
+ currentNode.put(subKey, childNode);
+ }
+ currentNode = childNode;
+ }
+ // Add the leaf pointing at null
+ String subKey = subKeys.get(subKeys.size() - 1);
+ if (currentNode.containsKey(subKey)) {
+ throw new IllegalArgumentException(
+ "Cannot handle field '" + field + "' which overlaps with another
field in the schema.");
+ }
+ currentNode.put(subKey, null);
+ }
+
+ return schemaTree;
+ }
+
+ /**
+ * Given a JSON path (e.g. "k1.k2.k3"), returns all the sub-keys (e.g.
["k1", "k2", "k3"])
+ * @param key The complete key
+ * @param firstKeySeparatorIdx The index of the first key separator in
{@code key}
+ * @param subKeys Returns the sub-keys
+ * @throws IllegalArgumentException if any sub-key is empty
+ */
+ private static void getAndValidateSubKeys(String key, int
firstKeySeparatorIdx, List<String> subKeys)
+ throws IllegalArgumentException {
+ int subKeyBeginIdx = 0;
+ int subKeyEndIdx = firstKeySeparatorIdx;
+ int keyLength = key.length();
+ while (true) {
+ // Validate and add the sub-key
+ String subKey = key.substring(subKeyBeginIdx, subKeyEndIdx);
+ if (subKey.isEmpty()) {
+ throw new IllegalArgumentException("Unsupported empty sub-key in '" +
key + "'.");
+ }
+ subKeys.add(subKey);
+
+ // Advance to the beginning of the next sub-key
+ subKeyBeginIdx = subKeyEndIdx + 1;
+ if (subKeyBeginIdx >= keyLength) {
+ break;
+ }
+
+ // Find the end of the next sub-key
+ int keySeparatorIdx = key.indexOf(JsonUtils.KEY_SEPARATOR,
subKeyBeginIdx);
+ if (-1 != keySeparatorIdx) {
+ subKeyEndIdx = keySeparatorIdx;
+ } else {
+ subKeyEndIdx = key.length();
+ }
+ }
+ }
+
+ public SchemaConformingTransformer(TableConfig tableConfig, Schema schema) {
+ if (null == tableConfig.getIngestionConfig() || null ==
tableConfig.getIngestionConfig()
+ .getSchemaConformingTransformerConfig()) {
+ _continueOnError = false;
+ _transformerConfig = null;
+ _indexableExtrasFieldType = null;
+ _unindexableExtrasFieldType = null;
+ return;
+ }
+
+ _continueOnError = tableConfig.getIngestionConfig().isContinueOnError();
+ _transformerConfig =
tableConfig.getIngestionConfig().getSchemaConformingTransformerConfig();
+ String indexableExtrasFieldName =
_transformerConfig.getIndexableExtrasField();
+ _indexableExtrasFieldType = getAndValidateExtrasFieldType(schema,
indexableExtrasFieldName);
+ String unindexableExtrasFieldName =
_transformerConfig.getUnindexableExtrasField();
+ _unindexableExtrasFieldType =
+ null == unindexableExtrasFieldName ? null :
getAndValidateExtrasFieldType(schema, unindexableExtrasFieldName);
+
+ _schemaTree = validateSchemaAndCreateTree(schema);
+ }
+
+ @Override
+ public boolean isNoOp() {
+ return null == _transformerConfig;
+ }
+
+ @Nullable
+ @Override
+ public GenericRow transform(GenericRow record) {
+ GenericRow outputRecord = new GenericRow();
+
+ try {
+ ExtraFieldsContainer extraFieldsContainer =
+ new ExtraFieldsContainer(null !=
_transformerConfig.getUnindexableExtrasField());
+ for (Map.Entry<String, Object> recordEntry :
record.getFieldToValueMap().entrySet()) {
+ String recordKey = recordEntry.getKey();
+ Object recordValue = recordEntry.getValue();
+ processField(_schemaTree, recordKey, recordKey, recordValue,
extraFieldsContainer, outputRecord);
+ }
+ putExtrasField(_transformerConfig.getIndexableExtrasField(),
_indexableExtrasFieldType,
+ extraFieldsContainer.getIndexableExtras(), outputRecord);
+ putExtrasField(_transformerConfig.getUnindexableExtrasField(),
_unindexableExtrasFieldType,
+ extraFieldsContainer.getUnindexableExtras(), outputRecord);
+ } catch (Exception e) {
+ if (!_continueOnError) {
+ throw e;
+ }
+ _logger.debug("Couldn't transform record: {}", record.toString(), e);
+ outputRecord.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
+ }
+
+ return outputRecord;
+ }
+
+ /**
+ * Processes a field from the record and either:
+ * <ul>
+ * <li>Drops it if it's in fieldPathsToDrop</li>
+ * <li>Adds it to the output record if it's special or exists in the
schema</li>
+ * <li>Adds it to one of the extras fields</li>
+ * </ul>
+ * <p>
+ * This method works recursively to build the output record. It is similar
to {@code addIndexableField} except it
+ * handles fields which exist in the schema.
+ * <p>
+ * One notable complication that this method (and {@code addIndexableField})
handles is adding nested fields (even
+ * ones more than two levels deep) to the "extras" fields. E.g., consider
this record:
+ * <pre>
+ * {
+ * a: {
+ * b: {
+ * c: 0,
+ * d: 1
+ * }
+ * }
+ * }
+ * </pre>
+ * Assume "a.b.c" exists in the schema but "a.b.d" doesn't. This class
processes the record recursively from the root
+ * node to the children, so it would only know that "a.b.d" doesn't exist
when it gets to "d". At this point we need
+ * to add "d" and all of its parents to the indexableExtrasField. To do so
efficiently, the class builds this branch
+ * starting from the leaf and attaches it to parent nodes as we return from
each recursive call.
+ * @param schemaNode The current node in the schema tree
+ * @param keyJsonPath The JSON path (without the "$." prefix) of the current
field
+ * @param key
+ * @param value
+ * @param extraFieldsContainer A container for the "extras" fields
corresponding to this node.
+ * @param outputRecord Returns the record after transformation
+ */
+ private void processField(Map<String, Object> schemaNode, String
keyJsonPath, String key, Object value,
+ ExtraFieldsContainer extraFieldsContainer, GenericRow outputRecord) {
+
+ if (StreamDataDecoderImpl.isSpecialKeyType(key) ||
GenericRow.isSpecialKeyType(key)) {
+ outputRecord.putValue(key, value);
+ return;
+ }
+
+ Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop();
+ if (null != fieldPathsToDrop && fieldPathsToDrop.contains(keyJsonPath)) {
+ return;
+ }
+
+ String unindexableFieldSuffix =
_transformerConfig.getUnindexableFieldSuffix();
+ if (null != unindexableFieldSuffix &&
key.endsWith(unindexableFieldSuffix)) {
+ extraFieldsContainer.addUnindexableEntry(key, value);
+ return;
+ }
+
+ if (!schemaNode.containsKey(key)) {
+ addIndexableField(keyJsonPath, key, value, extraFieldsContainer);
+ return;
+ }
+
+ Map<String, Object> childSchemaNode = (Map<String, Object>)
schemaNode.get(key);
+ boolean storeUnindexableExtras =
_transformerConfig.getUnindexableExtrasField() != null;
+ if (null == childSchemaNode) {
+ if (!(value instanceof Map) || null == unindexableFieldSuffix) {
+ outputRecord.putValue(keyJsonPath, value);
+ } else {
+ // The field's value is a map which could contain a no-index field, so
we need to keep traversing the map
+ ExtraFieldsContainer container = new
ExtraFieldsContainer(storeUnindexableExtras);
+ addIndexableField(keyJsonPath, key, value, container);
+ Map<String, Object> indexableFields = container.getIndexableExtras();
+ outputRecord.putValue(keyJsonPath, indexableFields.get(key));
+ Map<String, Object> unindexableFields =
container.getUnindexableExtras();
+ if (null != unindexableFields) {
+ extraFieldsContainer.addUnindexableEntry(key,
unindexableFields.get(key));
+ }
+ }
+ } else {
+ if (!(value instanceof Map)) {
+ _logger.debug("Record doesn't match schema: Schema node '{}' is a map
but record value is a {}", keyJsonPath,
+ value.getClass().getName());
+ extraFieldsContainer.addIndexableEntry(key, value);
+ } else {
+ ExtraFieldsContainer childExtraFieldsContainer = new
ExtraFieldsContainer(storeUnindexableExtras);
+ Map<String, Object> valueAsMap = (Map<String, Object>) value;
+ for (Map.Entry<String, Object> entry : valueAsMap.entrySet()) {
+ String childKey = entry.getKey();
+ processField(childSchemaNode, keyJsonPath + JsonUtils.KEY_SEPARATOR
+ childKey, childKey, entry.getValue(),
+ childExtraFieldsContainer, outputRecord);
+ }
+ extraFieldsContainer.addChild(key, childExtraFieldsContainer);
+ }
+ }
+ }
+
+ /**
+ * Adds an indexable field to the given {@code ExtrasFieldsContainer}.
+ * <p>
+ * This method is similar to {@code processField} except it doesn't handle
fields which exist in the schema.
+ */
+ void addIndexableField(String recordJsonPath, String key, Object value,
ExtraFieldsContainer extraFieldsContainer) {
+ Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop();
+ if (null != fieldPathsToDrop && fieldPathsToDrop.contains(recordJsonPath))
{
+ return;
+ }
+
+ String unindexableFieldSuffix =
_transformerConfig.getUnindexableFieldSuffix();
+ if (null != unindexableFieldSuffix &&
key.endsWith(unindexableFieldSuffix)) {
+ extraFieldsContainer.addUnindexableEntry(key, value);
+ return;
+ }
+
+ boolean storeUnindexableExtras =
_transformerConfig.getUnindexableExtrasField() != null;
+ if (!(value instanceof Map)) {
+ extraFieldsContainer.addIndexableEntry(key, value);
+ } else {
+ ExtraFieldsContainer childExtraFieldsContainer = new
ExtraFieldsContainer(storeUnindexableExtras);
+ Map<String, Object> valueAsMap = (Map<String, Object>) value;
+ for (Map.Entry<String, Object> entry : valueAsMap.entrySet()) {
+ String childKey = entry.getKey();
+ addIndexableField(recordJsonPath + JsonUtils.KEY_SEPARATOR + childKey,
childKey, entry.getValue(),
+ childExtraFieldsContainer);
+ }
+ extraFieldsContainer.addChild(key, childExtraFieldsContainer);
+ }
+ }
+
+ /**
+ * Converts (if necessary) and adds the given extras field to the output
record
+ */
+ private void putExtrasField(String fieldName, DataType fieldType,
Map<String, Object> field,
+ GenericRow outputRecord) {
+ if (null == field) {
+ return;
+ }
+
+ switch (fieldType) {
+ case JSON:
+ outputRecord.putValue(fieldName, field);
+ break;
+ case STRING:
+ try {
+ outputRecord.putValue(fieldName, JsonUtils.objectToString(field));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to convert '" + fieldName + "' to
string", e);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot convert '" + fieldName
+ "' to " + fieldType.name());
+ }
+ }
+}
+
+/**
+ * A class to encapsulate the "extras" fields (indexableExtras and
unindexableExtras) at a node in the record (when
+ * viewed as a tree).
+ */
+class ExtraFieldsContainer {
+ private Map<String, Object> _indexableExtras = null;
+ private Map<String, Object> _unindexableExtras = null;
+ private final boolean _storeUnindexableExtras;
+
+ ExtraFieldsContainer(boolean storeUnindexableExtras) {
+ _storeUnindexableExtras = storeUnindexableExtras;
+ }
+
+ public Map<String, Object> getIndexableExtras() {
+ return _indexableExtras;
+ }
+
+ public Map<String, Object> getUnindexableExtras() {
+ return _unindexableExtras;
+ }
+
+ /**
+ * Adds the given kv-pair to the indexable extras field
+ */
+ public void addIndexableEntry(String key, Object value) {
+ if (null == _indexableExtras) {
+ _indexableExtras = new HashMap<>();
+ }
+ _indexableExtras.put(key, value);
+ }
+
+ /**
+ * Adds the given kv-pair to the unindexable extras field (if any)
+ */
+ public void addUnindexableEntry(String key, Object value) {
+ if (!_storeUnindexableExtras) {
+ return;
+ }
+ if (null == _unindexableExtras) {
+ _unindexableExtras = new HashMap<>();
+ }
+ _unindexableExtras.put(key, value);
+ }
+
+ /**
+ * Given a container corresponding to a child node, attach the extras from
the child node to the extras in this node
+ * at the given key.
+ */
+ public void addChild(String key, ExtraFieldsContainer child) {
+ Map<String, Object> childIndexableFields = child.getIndexableExtras();
+ if (null != childIndexableFields) {
+ addIndexableEntry(key, childIndexableFields);
+ }
+
+ Map<String, Object> childUnindexableFields = child.getUnindexableExtras();
+ if (null != childUnindexableFields) {
+ addUnindexableEntry(key, childUnindexableFields);
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index 1d73c4cfd5..f6943e5468 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -308,6 +308,13 @@ public final class IngestionUtils {
*/
public static Set<String> getFieldsForRecordExtractor(@Nullable
IngestionConfig ingestionConfig, Schema schema) {
Set<String> fieldsForRecordExtractor = new HashSet<>();
+
+ if (null != ingestionConfig && null !=
ingestionConfig.getSchemaConformingTransformerConfig()) {
+ // The SchemaConformingTransformer requires that all fields are
extracted, indicated by returning an empty set
+ // here. Compared to extracting the fields specified below, extracting
all fields should be a superset.
+ return fieldsForRecordExtractor;
+ }
+
extractFieldsFromIngestionConfig(ingestionConfig,
fieldsForRecordExtractor);
extractFieldsFromSchema(schema, fieldsForRecordExtractor);
fieldsForRecordExtractor =
getFieldsToReadWithComplexType(fieldsForRecordExtractor, ingestionConfig);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 6e82dea79e..a7d65d6343 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -44,6 +44,7 @@ import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import
org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer;
import
org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.IndexService;
@@ -68,6 +69,7 @@ import
org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -498,6 +500,12 @@ public final class TableConfigUtils {
}
}
}
+
+ SchemaConformingTransformerConfig schemaConformingTransformerConfig =
+ ingestionConfig.getSchemaConformingTransformerConfig();
+ if (null != schemaConformingTransformerConfig && null != schema) {
+ SchemaConformingTransformer.validateSchema(schema,
schemaConformingTransformerConfig);
+ }
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java
new file mode 100644
index 0000000000..e9b3ec3d6d
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerTest.java
@@ -0,0 +1,731 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.fail;
+
+
+public class SchemaConformingTransformerTest {
+ static final private String INDEXABLE_EXTRAS_FIELD_NAME = "indexableExtras";
+ static final private String UNINDEXABLE_EXTRAS_FIELD_NAME =
"unindexableExtras";
+ static final private String UNINDEXABLE_FIELD_SUFFIX = "_noIndex";
+
+ static final private ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private TableConfig createDefaultTableConfig(String indexableExtrasField,
String unindexableExtrasField,
+ String unindexableFieldSuffix, Set<String> fieldPathsToDrop) {
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ SchemaConformingTransformerConfig schemaConformingTransformerConfig =
+ new SchemaConformingTransformerConfig(indexableExtrasField,
unindexableExtrasField, unindexableFieldSuffix,
+ fieldPathsToDrop);
+
ingestionConfig.setSchemaConformingTransformerConfig(schemaConformingTransformerConfig);
+ return new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig)
+ .build();
+ }
+
+ private Schema.SchemaBuilder createDefaultSchemaBuilder() {
+ return new
Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME,
DataType.JSON)
+ .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME, DataType.JSON);
+ }
+
+ @Test
+ public void testWithNoUnindexableFields() {
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ }
+ }
+ }
+ */
+ final String inputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+ +
"\"nullField\":null,\"stringField\":\"a\"},\"nestedFields\":{\"arrayField\":[0,1,2,3],"
+ +
"\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,"
+ + "\"stringField\":\"a\"}}}";
+ String expectedOutputRecordJSONString;
+ Schema schema;
+
+ schema = createDefaultSchemaBuilder().build();
+ /*
+ {
+ "indexableExtras":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ }
+ }
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"indexableExtras\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+ +
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+ +
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+ +
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}}}}";
+ testTransformWithNoUnindexableFields(schema, inputRecordJSONString,
expectedOutputRecordJSONString);
+
+ schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField",
DataType.INT)
+ .addSingleValueDimension("mapField", DataType.JSON)
+ .addSingleValueDimension("nestedFields.stringField",
DataType.STRING).build();
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields.stringField":"a",
+ "indexableExtras":{
+ "nullField":null,
+ "stringField":"a",
+ "nestedFields":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ }
+ }
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+ +
"\"nestedFields.stringField\":\"a\",\"indexableExtras\":{\"nullField\":null,\"stringField\":\"a\","
+ +
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"mapField\":{\"arrayField\":[0,1,2,3],"
+ + "\"nullField\":null,\"stringField\":\"a\"}}}}";
+ testTransformWithNoUnindexableFields(schema, inputRecordJSONString,
expectedOutputRecordJSONString);
+
+ schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField",
DataType.INT)
+ .addSingleValueDimension("nullField",
DataType.STRING).addSingleValueDimension("stringField", DataType.STRING)
+ .addSingleValueDimension("mapField", DataType.JSON)
+ .addMultiValueDimension("nestedFields.arrayField", DataType.INT)
+ .addSingleValueDimension("nestedFields.nullField", DataType.STRING)
+ .addSingleValueDimension("nestedFields.stringField", DataType.STRING)
+ .addSingleValueDimension("nestedFields.mapField",
DataType.JSON).build();
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields.arrayField":[0, 1, 2, 3],
+ "nestedFields.nullField":null,
+ "nestedFields.stringField":"a",
+ "nestedFields.mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+ +
"\"nullField\":null,\"stringField\":\"a\"},\"nestedFields.arrayField\":[0,1,2,3],\"nestedFields"
+ +
".nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields.mapField\":{\"arrayField\":[0,1,2,"
+ + "3],\"nullField\":null,\"stringField\":\"a\"}}";
+ testTransformWithNoUnindexableFields(schema, inputRecordJSONString,
expectedOutputRecordJSONString);
+ }
+
+ private void testTransformWithNoUnindexableFields(Schema schema, String
inputRecordJSONString,
+ String expectedOutputRecordJSONString) {
+ testTransform(null, null, schema, null, inputRecordJSONString,
expectedOutputRecordJSONString);
+ testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null,
inputRecordJSONString, expectedOutputRecordJSONString);
+ testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX,
schema, null, inputRecordJSONString,
+ expectedOutputRecordJSONString);
+ }
+
+ @Test
+ public void testWithUnindexableFields() {
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "intField_noIndex":9,
+ "string_noIndex":"z",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "intField_noIndex":9,
+ "string_noIndex":"z"
+ },
+ "nestedFields":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "intField_noIndex":9,
+ "string_noIndex":"z",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "intField_noIndex":9,
+ "string_noIndex":"z"
+ }
+ }
+ }
+ */
+ final String inputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"intField_noIndex\":9,"
+ +
"\"string_noIndex\":\"z\",\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,"
+ +
"\"stringField\":\"a\",\"intField_noIndex\":9,\"string_noIndex\":\"z\"},"
+ +
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+ +
"\"intField_noIndex\":9,\"string_noIndex\":\"z\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+ +
"\"nullField\":null,\"stringField\":\"a\",\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}";
+ String expectedOutputRecordJSONString;
+ Schema schema;
+
+ schema = createDefaultSchemaBuilder().build();
+ /*
+ {
+ "indexableExtras":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ }
+ }
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"indexableExtras\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+ +
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+ +
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+ +
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}}}}";
+ testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null,
inputRecordJSONString, expectedOutputRecordJSONString);
+ /*
+ {
+ "indexableExtras":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ }
+ }
+ },
+ "unindexableExtras":{
+ "intField_noIndex":9,
+ "string_noIndex":"z",
+ "mapField":{
+ "intField_noIndex":9,
+ "string_noIndex":"z"
+ },
+ "nestedFields":{
+ "intField_noIndex":9,
+ "string_noIndex":"z",
+ "mapField":{
+ "intField_noIndex":9,
+ "string_noIndex":"z"
+ }
+ }
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"indexableExtras\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+ +
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+ +
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+ +
"\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"}}},"
+ +
"\"unindexableExtras\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\","
+ + "\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"},"
+ +
"\"nestedFields\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\","
+ +
"\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}}";
+ testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX,
schema, null, inputRecordJSONString,
+ expectedOutputRecordJSONString);
+
+ schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField",
DataType.INT)
+ .addSingleValueDimension("mapField", DataType.JSON)
+ .addSingleValueDimension("nestedFields.stringField",
DataType.STRING).build();
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields.stringField":"a",
+ "indexableExtras":{
+ "nullField":null,
+ "stringField":"a",
+ "nestedFields":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ }
+ }
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+ +
"\"nestedFields.stringField\":\"a\",\"indexableExtras\":{\"nullField\":null,\"stringField\":\"a\","
+ +
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"mapField\":{\"arrayField\":[0,1,2,3],"
+ + "\"nullField\":null,\"stringField\":\"a\"}}}}";
+ testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null,
inputRecordJSONString, expectedOutputRecordJSONString);
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields.stringField":"a",
+ "indexableExtras":{
+ "nullField":null,
+ "stringField":"a",
+ "nestedFields":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ }
+ }
+ },
+ "unindexableExtras":{
+ "intField_noIndex":9,
+ "string_noIndex":"z",
+ "mapField":{
+ "intField_noIndex":9,
+ "string_noIndex":"z"
+ },
+ "nestedFields":{
+ "intField_noIndex":9,
+ "string_noIndex":"z",
+ "mapField":{
+ "intField_noIndex":9,
+ "string_noIndex":"z"
+ }
+ }
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"mapField\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\"},"
+ +
"\"nestedFields.stringField\":\"a\",\"indexableExtras\":{\"nullField\":null,\"stringField\":\"a\","
+ +
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"mapField\":{\"arrayField\":[0,1,2,3],"
+ +
"\"nullField\":null,\"stringField\":\"a\"}}},\"unindexableExtras\":{\"intField_noIndex\":9,"
+ +
"\"string_noIndex\":\"z\",\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"},"
+ +
"\"nestedFields\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\","
+ +
"\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}}";
+ testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX,
schema, null, inputRecordJSONString,
+ expectedOutputRecordJSONString);
+
+ schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField",
DataType.INT)
+ .addSingleValueDimension("nullField",
DataType.STRING).addSingleValueDimension("stringField", DataType.STRING)
+ .addSingleValueDimension("mapField", DataType.JSON)
+ .addMultiValueDimension("nestedFields.arrayField", DataType.INT)
+ .addSingleValueDimension("nestedFields.nullField", DataType.STRING)
+ .addSingleValueDimension("nestedFields.stringField", DataType.STRING)
+ .addSingleValueDimension("nestedFields.mapField",
DataType.JSON).build();
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields.arrayField":[0, 1, 2, 3],
+ "nestedFields.nullField":null,
+ "nestedFields.stringField":"a",
+ "nestedFields.mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+ +
"\"nullField\":null,\"stringField\":\"a\"},\"nestedFields.arrayField\":[0,1,2,3],\"nestedFields"
+ +
".nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields.mapField\":{\"arrayField\":[0,1,2,"
+ + "3],\"nullField\":null,\"stringField\":\"a\"} }";
+ testTransform(null, UNINDEXABLE_FIELD_SUFFIX, schema, null,
inputRecordJSONString, expectedOutputRecordJSONString);
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "nestedFields.arrayField":[0, 1, 2, 3],
+ "nestedFields.nullField":null,
+ "nestedFields.stringField":"a",
+ "nestedFields.mapField":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a"
+ },
+ "unindexableExtras":{
+ "intField_noIndex":9,
+ "string_noIndex":"z",
+ "mapField":{
+ "intField_noIndex":9,
+ "string_noIndex":"z"
+ },
+ "nestedFields":{
+ "intField_noIndex":9,
+ "string_noIndex":"z",
+ "mapField":{
+ "intField_noIndex":9,
+ "string_noIndex":"z"
+ }
+ }
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"mapField\":{\"arrayField\":[0,1,2,3],"
+ +
"\"nullField\":null,\"stringField\":\"a\"},\"nestedFields.arrayField\":[0,1,2,3],\"nestedFields"
+ +
".nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields.mapField\":{\"arrayField\":[0,1,2,"
+ +
"3],\"nullField\":null,\"stringField\":\"a\"},\"unindexableExtras\":{\"intField_noIndex\":9,"
+ +
"\"string_noIndex\":\"z\",\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"},"
+ +
"\"nestedFields\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\","
+ +
"\"mapField\":{\"intField_noIndex\":9,\"string_noIndex\":\"z\"}}}}";
+ testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX,
schema, null, inputRecordJSONString,
+ expectedOutputRecordJSONString);
+ }
+
+ @Test
+ public void testFieldPathsToDrop() {
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "boolField":false,
+ "nestedFields":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "boolField":false
+ }
+ }
+ */
+ final String inputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\",\"boolField\":false,"
+ +
"\"nestedFields\":{\"arrayField\":[0,1,2,3],\"nullField\":null,\"stringField\":\"a\","
+ + "\"boolField\":false}}";
+ String expectedOutputRecordJSONString;
+ Schema schema;
+
+ schema = createDefaultSchemaBuilder().addMultiValueDimension("arrayField",
DataType.INT)
+ .addSingleValueDimension("nullField", DataType.STRING)
+ .addSingleValueDimension("nestedFields.stringField", DataType.STRING)
+ .addSingleValueDimension("nestedFields.boolField",
DataType.BOOLEAN).build();
+ Set<String> fieldPathsToDrop = new HashSet<>(Arrays.asList("stringField",
"nestedFields.arrayField"));
+ /*
+ {
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "indexableExtras": {
+ "boolField":false,
+ "nestedFields": {
+ nullField":null
+ }
+ },
+ "nestedFields":{
+ "stringField":"a",
+ "boolField":false
+ }
+ }
+ */
+ expectedOutputRecordJSONString =
+
"{\"arrayField\":[0,1,2,3],\"nullField\":null,\"nestedFields.stringField\":\"a\",\"nestedFields"
+ +
".boolField\":false,\"indexableExtras\":{\"boolField\":false,\"nestedFields\":{\"nullField\":null}}}";
+ testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX,
schema, fieldPathsToDrop,
+ inputRecordJSONString, expectedOutputRecordJSONString);
+ }
+
+ @Test
+ public void testIgnoringSpecialRowKeys() {
+ // Configure a FilterTransformer and a SchemaConformingTransformer such
that the filter will introduce a special
+ // key $(SKIP_RECORD_KEY$) that the SchemaConformingTransformer should
ignore
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setFilterConfig(new FilterConfig("intField = 1"));
+ SchemaConformingTransformerConfig schemaConformingTransformerConfig =
+ new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME,
UNINDEXABLE_EXTRAS_FIELD_NAME,
+ UNINDEXABLE_FIELD_SUFFIX, null);
+
ingestionConfig.setSchemaConformingTransformerConfig(schemaConformingTransformerConfig);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
+
+ // Create a series of transformers: FilterTransformer ->
SchemaConformingTransformer
+ List<RecordTransformer> transformers = new LinkedList<>();
+ transformers.add(new FilterTransformer(tableConfig));
+ Schema schema =
createDefaultSchemaBuilder().addSingleValueDimension("intField",
DataType.INT).build();
+ transformers.add(new SchemaConformingTransformer(tableConfig, schema));
+ CompositeTransformer compositeTransformer = new
CompositeTransformer(transformers);
+
+ Map<String, Object> inputRecordMap = jsonStringToMap("{\"intField\":1}");
+ GenericRow inputRecord = createRowFromMap(inputRecordMap);
+ GenericRow outputRecord = compositeTransformer.transform(inputRecord);
+ Assert.assertNotNull(outputRecord);
+ // Check that the transformed record has $SKIP_RECORD_KEY$
+ Assert.assertFalse(IngestionUtils.shouldIngestRow(outputRecord));
+ }
+
+ @Test
+ public void testOverlappingSchemaFields() {
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ Schema schema =
createDefaultSchemaBuilder().addSingleValueDimension("a.b", DataType.STRING)
+ .addSingleValueDimension("a.b.c", DataType.INT).build();
+ SchemaConformingTransformer.validateSchema(schema,
+ new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME,
null, null, null));
+ });
+
+ // This is a repeat of the previous test but with fields reversed just in
case they are processed in order
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ Schema schema =
createDefaultSchemaBuilder().addSingleValueDimension("a.b.c", DataType.INT)
+ .addSingleValueDimension("a.b", DataType.STRING).build();
+ SchemaConformingTransformer.validateSchema(schema,
+ new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME,
null, null, null));
+ });
+ }
+
+ @Test
+ public void testInvalidFieldNamesInSchema() {
+ // Ensure schema fields which end with unindexableFieldSuffix are caught
as invalid
+ Assert.assertThrows(() -> {
+ Schema schema =
+ createDefaultSchemaBuilder().addSingleValueDimension("a" +
UNINDEXABLE_FIELD_SUFFIX, DataType.STRING)
+ .addSingleValueDimension("a.b" + UNINDEXABLE_FIELD_SUFFIX,
DataType.INT).build();
+ SchemaConformingTransformer.validateSchema(schema,
+ new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME,
null, UNINDEXABLE_FIELD_SUFFIX, null));
+ });
+
+ // Ensure schema fields which are in fieldPathsToDrop are caught as invalid
+ Assert.assertThrows(() -> {
+ Schema schema =
createDefaultSchemaBuilder().addSingleValueDimension("a", DataType.STRING)
+ .addSingleValueDimension("b.c", DataType.INT).build();
+ Set<String> fieldPathsToDrop = new HashSet<>(Arrays.asList("a", "b.c"));
+ SchemaConformingTransformer.validateSchema(schema,
+ new SchemaConformingTransformerConfig(INDEXABLE_EXTRAS_FIELD_NAME,
null, null, fieldPathsToDrop));
+ });
+ }
+
+ @Test
+ public void testSchemaRecordMismatch() {
+ Schema schema =
+
createDefaultSchemaBuilder().addSingleValueDimension("nestedFields.mapField",
DataType.JSON).build();
+ /*
+ {
+ "indexableExtras":{
+ "nestedFields":0,
+ }
+ }
+ */
+ // Schema field "nestedFields.map" is a Map but the record field is an
int, so it should be stored in
+ // indexableExtras
+ testTransform(UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX,
schema, null, "{\"nestedFields\":0}",
+ "{\"indexableExtras\":{\"nestedFields\":0}}");
+ }
+
+ @Test
+ public void testFieldTypesForExtras() {
+ final String inputRecordJSONString = "{\"arrayField\":[0,1,2,3]}";
+
+ TableConfig tableConfig =
+ createDefaultTableConfig(INDEXABLE_EXTRAS_FIELD_NAME,
UNINDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_FIELD_SUFFIX,
+ null);
+ Schema validSchema =
+ new
Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME,
DataType.STRING)
+ .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME,
DataType.STRING).build();
+ GenericRow outputRecord = transformRow(tableConfig, validSchema,
inputRecordJSONString);
+
+ Assert.assertNotNull(outputRecord);
+ // Validate that the indexable extras field contains the input record as a
string
+ Assert.assertEquals(outputRecord.getValue(INDEXABLE_EXTRAS_FIELD_NAME),
inputRecordJSONString);
+
+ // Validate that invalid field types are caught
+ Schema invalidSchema = new
Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME,
DataType.INT)
+ .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME,
DataType.BOOLEAN).build();
+ Assert.assertThrows(() -> {
+ transformRow(tableConfig, invalidSchema, inputRecordJSONString);
+ });
+ }
+
+ @Test
+ public void testInvalidTransformerConfig() {
+ Assert.assertThrows(() -> {
+ createDefaultTableConfig(null, null, null, null);
+ });
+ Assert.assertThrows(() -> {
+ createDefaultTableConfig(null, UNINDEXABLE_EXTRAS_FIELD_NAME, null,
null);
+ });
+ Assert.assertThrows(() -> {
+ createDefaultTableConfig(null, null, UNINDEXABLE_FIELD_SUFFIX, null);
+ });
+ Assert.assertThrows(() -> {
+ createDefaultTableConfig(null, UNINDEXABLE_EXTRAS_FIELD_NAME,
UNINDEXABLE_FIELD_SUFFIX, null);
+ });
+ Assert.assertThrows(() -> {
+ createDefaultTableConfig(INDEXABLE_EXTRAS_FIELD_NAME,
UNINDEXABLE_EXTRAS_FIELD_NAME, null, null);
+ });
+ }
+
+ /**
+ * Validates transforming the given row results in the expected row, where
both rows are given as JSON strings
+ */
+ private void testTransform(String unindexableExtrasField, String
unindexableFieldSuffix, Schema schema,
+ Set<String> fieldPathsToDrop, String inputRecordJSONString, String
expectedOutputRecordJSONString) {
+ TableConfig tableConfig =
+ createDefaultTableConfig(INDEXABLE_EXTRAS_FIELD_NAME,
unindexableExtrasField, unindexableFieldSuffix,
+ fieldPathsToDrop);
+ GenericRow outputRecord = transformRow(tableConfig, schema,
inputRecordJSONString);
+
+ Assert.assertNotNull(outputRecord);
+ Map<String, Object> expectedOutputRecordMap =
jsonStringToMap(expectedOutputRecordJSONString);
+ Assert.assertEquals(outputRecord.getFieldToValueMap(),
expectedOutputRecordMap);
+ }
+
+ /**
+ * Transforms the given row (given as a JSON string) using the transformer
+ * @return The transformed row
+ */
+ private GenericRow transformRow(TableConfig tableConfig, Schema schema,
String inputRecordJSONString) {
+ Map<String, Object> inputRecordMap =
jsonStringToMap(inputRecordJSONString);
+ GenericRow inputRecord = createRowFromMap(inputRecordMap);
+ SchemaConformingTransformer schemaConformingTransformer = new
SchemaConformingTransformer(tableConfig, schema);
+ return schemaConformingTransformer.transform(inputRecord);
+ }
+
+ /**
+ * @return A map representing the given JSON string
+ */
+ @Nonnull
+ private Map<String, Object> jsonStringToMap(String jsonString) {
+ try {
+ TypeReference<Map<String, Object>> typeRef = new TypeReference<>() {
+ };
+ return OBJECT_MAPPER.readValue(jsonString, typeRef);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ // Should never reach here
+ return null;
+ }
+
+ /**
+ * @return A new generic row with all the kv-pairs from the given map
+ */
+ private GenericRow createRowFromMap(Map<String, Object> map) {
+ GenericRow record = new GenericRow();
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ record.putValue(entry.getKey(), entry.getValue());
+ }
+ return record;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 4b26f33cd7..86ff9cec6f 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -45,6 +45,9 @@ public class IngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("Config related to handling complex type")
private ComplexTypeConfig _complexTypeConfig;
+ @JsonPropertyDescription("Config related to the SchemaConformingTransformer")
+ private SchemaConformingTransformerConfig _schemaConformingTransformerConfig;
+
@JsonPropertyDescription("Configs related to record aggregation function
applied during ingestion")
private List<AggregationConfig> _aggregationConfigs;
@@ -61,12 +64,14 @@ public class IngestionConfig extends BaseJsonConfig {
public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig,
@Nullable StreamIngestionConfig streamIngestionConfig, @Nullable
FilterConfig filterConfig,
@Nullable List<TransformConfig> transformConfigs, @Nullable
ComplexTypeConfig complexTypeConfig,
+ @Nullable SchemaConformingTransformerConfig
schemaConformingTransformerConfig,
@Nullable List<AggregationConfig> aggregationConfigs) {
_batchIngestionConfig = batchIngestionConfig;
_streamIngestionConfig = streamIngestionConfig;
_filterConfig = filterConfig;
_transformConfigs = transformConfigs;
_complexTypeConfig = complexTypeConfig;
+ _schemaConformingTransformerConfig = schemaConformingTransformerConfig;
_aggregationConfigs = aggregationConfigs;
}
@@ -98,6 +103,11 @@ public class IngestionConfig extends BaseJsonConfig {
return _complexTypeConfig;
}
+ @Nullable
+ public SchemaConformingTransformerConfig
getSchemaConformingTransformerConfig() {
+ return _schemaConformingTransformerConfig;
+ }
+
@Nullable
public List<AggregationConfig> getAggregationConfigs() {
return _aggregationConfigs;
@@ -135,6 +145,11 @@ public class IngestionConfig extends BaseJsonConfig {
_complexTypeConfig = complexTypeConfig;
}
+ public void setSchemaConformingTransformerConfig(
+ SchemaConformingTransformerConfig schemaConformingTransformerConfig) {
+ _schemaConformingTransformerConfig = schemaConformingTransformerConfig;
+ }
+
public void setAggregationConfigs(List<AggregationConfig>
aggregationConfigs) {
_aggregationConfigs = aggregationConfigs;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
new file mode 100644
index 0000000000..96231f39d9
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.google.common.base.Preconditions;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+public class SchemaConformingTransformerConfig extends BaseJsonConfig {
+ @JsonPropertyDescription("Name of the field that should contain extra fields
that are not part of the schema.")
+ private final String _indexableExtrasField;
+
+ @JsonPropertyDescription(
+ "Like indexableExtrasField except it only contains fields with the
suffix in unindexableFieldSuffix.")
+ private final String _unindexableExtrasField;
+
+ @JsonPropertyDescription("The suffix of fields that must be stored in
unindexableExtrasField")
+ private final String _unindexableFieldSuffix;
+
+ @JsonPropertyDescription("Array of field paths to drop")
+ private final Set<String> _fieldPathsToDrop;
+
+ @JsonCreator
+ public
SchemaConformingTransformerConfig(@JsonProperty("indexableExtrasField") String
indexableExtrasField,
+ @JsonProperty("unindexableExtrasField") @Nullable String
unindexableExtrasField,
+ @JsonProperty("unindexableFieldSuffix") @Nullable String
unindexableFieldSuffix,
+ @JsonProperty("fieldPathsToDrop") @Nullable Set<String>
fieldPathsToDrop) {
+ Preconditions.checkArgument(indexableExtrasField != null,
"indexableExtrasField must be set");
+ if (null != unindexableExtrasField) {
+ Preconditions.checkArgument(null != unindexableFieldSuffix,
+ "unindexableExtrasSuffix must be set if unindexableExtrasField is
set");
+ }
+ _indexableExtrasField = indexableExtrasField;
+ _unindexableExtrasField = unindexableExtrasField;
+ _unindexableFieldSuffix = unindexableFieldSuffix;
+ _fieldPathsToDrop = fieldPathsToDrop;
+ }
+
+ public String getIndexableExtrasField() {
+ return _indexableExtrasField;
+ }
+
+ @Nullable
+ public String getUnindexableExtrasField() {
+ return _unindexableExtrasField;
+ }
+
+ @Nullable
+ public String getUnindexableFieldSuffix() {
+ return _unindexableFieldSuffix;
+ }
+
+ @Nullable
+ public Set<String> getFieldPathsToDrop() {
+ return _fieldPathsToDrop;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 8ef3838189..2bc15a0800 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -78,6 +78,13 @@ public class GenericRow implements Serializable {
private final Map<String, Object> _fieldToValueMap = new HashMap<>();
private final Set<String> _nullValueFields = new HashSet<>();
+ /**
+ * @return Whether the given key is one of the special types of keys
($SKIP_RECORD_KEY$, etc.)
+ */
+ public static boolean isSpecialKeyType(String key) {
+ return key.equals(SKIP_RECORD_KEY) || key.equals(INCOMPLETE_RECORD_KEY) ||
key.equals(MULTIPLE_RECORDS_KEY);
+ }
+
/**
* Initializes the generic row from the given generic row (shallow copy).
The row should be new created or cleared
* before calling this method.
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
index 5093c2c8b9..89c70acaf8 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -34,6 +34,13 @@ public class StreamDataDecoderImpl implements
StreamDataDecoder {
private final StreamMessageDecoder _valueDecoder;
private final GenericRow _reuse = new GenericRow();
+ /**
+ * @return Whether the given key is one of the special types of keys (__key,
__header$, etc.)
+ */
+ public static boolean isSpecialKeyType(String key) {
+ return key.equals(KEY) || key.startsWith(HEADER_KEY_PREFIX) ||
key.startsWith(METADATA_KEY_PREFIX);
+ }
+
public StreamDataDecoderImpl(StreamMessageDecoder valueDecoder) {
_valueDecoder = valueDecoder;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]