FrankChen021 commented on code in PR #19379: URL: https://github.com/apache/druid/pull/19379#discussion_r3154598898
########## processing/src/main/java/org/apache/druid/segment/transform/ScanTransformer.java: ########## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.CursorBuildSpec; +import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.CursorHolder; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentMapFunction; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * A {@link BaseTransformer} that processes input rows through a reusable scan query cursor pipeline. + * + * <p>The pipeline is built once at construction: a {@link SettableRowCursorFactory} is wrapped by the + * scan query's {@link SegmentMapFunction} (e.g., unnest, filter). For each input row, the row is set + * on the factory and the cursor is {@link Cursor#reset reset} — no per-row segment or cursor allocation. + * + * <p>When the scan query produces zero output rows (e.g., null/missing arrays, or filter rejection), + * the input row passes through with unnest output columns set to null and virtual columns still evaluated. + * This differs from {@link TransformSpec}'s filter behavior which drops the row entirely. + * + * <p>This class is not thread-safe. Each reader thread should have its own instance. + */ +public class ScanTransformer implements BaseTransformer +{ + private final ScanQuery query; + private final SettableRowCursorFactory baseCursorFactory; + private final CursorHolder cursorHolder; + private Cursor cursor; + + ScanTransformer(final ScanQuery scanQuery) + { + this.query = scanQuery.withOverriddenContext( + Map.of(QueryContexts.TIMEOUT_KEY, 0) + ); + + final RowSignature broadSignature = RowSignature.builder() + .add(ColumnHolder.TIME_COLUMN_NAME, ColumnType.LONG) + .build(); + + final CursorBuildSpec cursorBuildSpec = CursorBuildSpec.builder() + .setInterval(query.getSingleInterval()) + .setFilter(Filters.toFilter(query.getFilter())) + .setVirtualColumns(query.getVirtualColumns()) + .build(); + + this.baseCursorFactory = new SettableRowCursorFactory(broadSignature); + final SegmentMapFunction segmentMapFunction = query.getDataSource().createSegmentMapFunction(query); + final Segment mappedSegment = segmentMapFunction.apply(Optional.of(new CursorFactorySegment(baseCursorFactory))) + .orElseThrow(() -> new ISE("SegmentMapFunction returned empty")); + final CursorFactory mappedCursorFactory = mappedSegment.as(CursorFactory.class); + this.cursorHolder = mappedCursorFactory.makeCursorHolder(cursorBuildSpec); + } + + @Override + public boolean hasMultiRowTransform() + { + return true; + } + + @Override + @Nullable + public InputRow transform(@Nullable final InputRow row) + { + throw new UnsupportedOperationException( + "ScanTransformer does not support single-row transform; use transformToList()" + ); + } + + @Override + public List<InputRow> transformToList(@Nullable final InputRow row) + { + if (row == null) { + return List.of(); + } + + return process(row); + } + + @Override + @Nullable + public InputRowListPlusRawValues transform(@Nullable final InputRowListPlusRawValues row) + { + if (row == null || row.getInputRows() == null) { + return row; + } + + final List<InputRow> inputRows = row.getInputRows(); + final List<Map<String, Object>> inputRawValues = row.getRawValuesList(); + final List<InputRow> outputRows = new ArrayList<>(); + final List<Map<String, Object>> outputRawValues = inputRawValues == null ? null : new ArrayList<>(); + + for (int i = 0; i < inputRows.size(); i++) { + final List<InputRow> expandedRows = transformToList(inputRows.get(i)); + outputRows.addAll(expandedRows); + if (outputRawValues != null) { + for (int j = 0; j < expandedRows.size(); j++) { + outputRawValues.add(inputRawValues.get(i)); + } + } + } + + return InputRowListPlusRawValues.ofList(outputRawValues, outputRows, row.getParseException()); + } + + @Override + public void close() throws IOException + { + cursorHolder.close(); + } + + private List<InputRow> process(final InputRow inputRow) + { + baseCursorFactory.set(inputRow); + + if (cursor == null) { + cursor = cursorHolder.asCursor(); + } else { + cursor.reset(); + } + + if (cursor == null || cursor.isDone()) { + return List.of(buildPassthroughRow(inputRow)); + } + + final List<String> columns = resolveColumnsForRow(inputRow); + final List<String> dimensionColumns = resolveDimensionColumns(inputRow, columns); + final ColumnSelectorFactory selectorFactory = cursor.getColumnSelectorFactory(); + + final List<InputRow> result = new ArrayList<>(); + while (!cursor.isDone()) { + final Map<String, Object> event = new LinkedHashMap<>(); + for (final String col : columns) { + event.put(col, selectorFactory.makeColumnValueSelector(col).getObject()); + } + result.add(new MapBasedInputRow(inputRow.getTimestampFromEpoch(), dimensionColumns, event)); + cursor.advance(); + } + + return result.isEmpty() ? List.of(buildPassthroughRow(inputRow)) : result; + } + + private InputRow buildPassthroughRow(final InputRow inputRow) + { + final Set<String> unnestOutputColumns = new LinkedHashSet<>(); + collectOutputColumnNames(query.getDataSource(), unnestOutputColumns); + + final List<String> columns = resolveColumnsForRow(inputRow); + final List<String> dimensionColumns = resolveDimensionColumns(inputRow, columns); + final ColumnSelectorFactory factory = baseCursorFactory.getColumnSelectorFactory(query.getVirtualColumns()); + final Map<String, Object> event = new LinkedHashMap<>(); + for (final String col : columns) { + if (unnestOutputColumns.contains(col)) { + event.put(col, null); + } else { + event.put(col, factory.makeColumnValueSelector(col).getObject()); + } + } + return new MapBasedInputRow(inputRow.getTimestampFromEpoch(), dimensionColumns, event); + } + + private List<String> resolveColumnsForRow(final InputRow inputRow) + { + final Set<String> columns = new LinkedHashSet<>(); Review Comment: [P1] Preserve non-dimension raw columns in scan output resolveColumnsForRow only emits __time, inputRow.getDimensions(), scan virtual columns, and unnest output columns. In ingestion, fields used by metricsSpec are commonly excluded from dimensions, so after a scan transform expands a row those raw metric fields disappear from the MapBasedInputRow event and downstream aggregators read null/zero instead of the original value. The previous implementation explicitly included MapBasedInputRow event keys, and the incremental diff removes the test that covered this case. Include all raw event keys, where available, when constructing scan output rows. ########## processing/src/main/java/org/apache/druid/segment/transform/ScanTransformer.java: ########## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.CursorBuildSpec; +import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.CursorHolder; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentMapFunction; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * A {@link BaseTransformer} that processes input rows through a reusable scan query cursor pipeline. + * + * <p>The pipeline is built once at construction: a {@link SettableRowCursorFactory} is wrapped by the + * scan query's {@link SegmentMapFunction} (e.g., unnest, filter). For each input row, the row is set + * on the factory and the cursor is {@link Cursor#reset reset} — no per-row segment or cursor allocation. + * + * <p>When the scan query produces zero output rows (e.g., null/missing arrays, or filter rejection), + * the input row passes through with unnest output columns set to null and virtual columns still evaluated. + * This differs from {@link TransformSpec}'s filter behavior which drops the row entirely. + * + * <p>This class is not thread-safe. Each reader thread should have its own instance. + */ +public class ScanTransformer implements BaseTransformer +{ + private final ScanQuery query; + private final SettableRowCursorFactory baseCursorFactory; + private final CursorHolder cursorHolder; + private Cursor cursor; + + ScanTransformer(final ScanQuery scanQuery) + { + this.query = scanQuery.withOverriddenContext( + Map.of(QueryContexts.TIMEOUT_KEY, 0) + ); + + final RowSignature broadSignature = RowSignature.builder() + .add(ColumnHolder.TIME_COLUMN_NAME, ColumnType.LONG) + .build(); + + final CursorBuildSpec cursorBuildSpec = CursorBuildSpec.builder() + .setInterval(query.getSingleInterval()) + .setFilter(Filters.toFilter(query.getFilter())) + .setVirtualColumns(query.getVirtualColumns()) + .build(); + + this.baseCursorFactory = new SettableRowCursorFactory(broadSignature); + final SegmentMapFunction segmentMapFunction = query.getDataSource().createSegmentMapFunction(query); + final Segment mappedSegment = segmentMapFunction.apply(Optional.of(new CursorFactorySegment(baseCursorFactory))) + .orElseThrow(() -> new ISE("SegmentMapFunction returned empty")); + final CursorFactory mappedCursorFactory = mappedSegment.as(CursorFactory.class); + this.cursorHolder = mappedCursorFactory.makeCursorHolder(cursorBuildSpec); + } + + @Override + public boolean hasMultiRowTransform() + { + return true; + } + + @Override + @Nullable + public InputRow transform(@Nullable final InputRow row) + { + throw new UnsupportedOperationException( + "ScanTransformer does not support single-row transform; use transformToList()" + ); + } + + @Override + public List<InputRow> transformToList(@Nullable final InputRow row) + { + if (row == null) { + return List.of(); + } + + return process(row); + } + + @Override + @Nullable + public InputRowListPlusRawValues transform(@Nullable final InputRowListPlusRawValues row) + { + if (row == null || row.getInputRows() == null) { + return row; + } + + final List<InputRow> inputRows = row.getInputRows(); + final List<Map<String, Object>> inputRawValues = row.getRawValuesList(); + final List<InputRow> outputRows = new ArrayList<>(); + final List<Map<String, Object>> outputRawValues = inputRawValues == null ? null : new ArrayList<>(); + + for (int i = 0; i < inputRows.size(); i++) { + final List<InputRow> expandedRows = transformToList(inputRows.get(i)); + outputRows.addAll(expandedRows); + if (outputRawValues != null) { + for (int j = 0; j < expandedRows.size(); j++) { + outputRawValues.add(inputRawValues.get(i)); + } + } + } + + return InputRowListPlusRawValues.ofList(outputRawValues, outputRows, row.getParseException()); + } + + @Override + public void close() throws IOException + { + cursorHolder.close(); + } + + private List<InputRow> process(final InputRow inputRow) + { + baseCursorFactory.set(inputRow); + + if (cursor == null) { + cursor = cursorHolder.asCursor(); + } else { + cursor.reset(); + } + + if (cursor == null || cursor.isDone()) { Review Comment: [P2] Do not pass through rows rejected by scan filters When the scan cursor is done, the transformer always returns buildPassthroughRow(inputRow). That makes a scan query filter unable to drop rows: a row rejected by query.getFilter() is still ingested once with unnest outputs set to null, as the updated test now asserts. This diverges from normal TransformSpec filtering and from scan/unnest filtering semantics, and it inflates row counts for specs that use scan filters to exclude input or unnested rows. Distinguish empty/null unnest passthrough from filter rejection and return an empty list when the filter rejects the row. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
