github-advanced-security[bot] commented on code in PR #19579: URL: https://github.com/apache/druid/pull/19579#discussion_r3420357309
########## processing/src/main/java/org/apache/druid/segment/incremental/OnHeapClusteredBaseTable.java: ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Ordering; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.ClusteredValueGroupsBaseTableProjectionSpec; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionDictionary; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.SortedDimensionDictionary; +import org.apache.druid.segment.StringDimensionDictionary; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.projections.ClusteredValueGroupsBaseTableSchema; +import org.apache.druid.segment.projections.ClusteringDictionaries; +import org.apache.druid.segment.projections.TableClusterGroupSpec; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Segment-wide root of the clustered base-table machinery for {@link OnheapIncrementalIndex}: holds the per-type + * shared clustering dictionaries, the clustering virtual-column selector factory, and the map of + * {@link OnHeapClusterGroup} instances keyed by the clustering-value dictionary-ID tuple. The parent + * {@code OnheapIncrementalIndex} delegates row ingestion here when its schema carries a + * {@link ClusteredValueGroupsBaseTableProjectionSpec}, and the base facts holder stays empty in that mode. + * <p> + * On row arrival: each clustering column's value is resolved through the virtual-column selector, then + * dictionary-encoded into the per-type dictionary for the column's clustering type. The resulting dictionary-ID tuple + * becomes the lookup key for the {@link OnHeapClusterGroup} that owns the row, new groups are materialized lazily as + * new tuples arrive. + * <p> + * Dictionaries here are insertion-order (id = first-seen position). The persist path sorts + remaps them into the + * read-side {@link ClusteringDictionaries} shape (sorted, nulls first) at segment-write time. + */ +public final class OnHeapClusteredBaseTable +{ + private static final Comparator<TableClusterGroupSpec> BY_CLUSTERING_VALUE_IDS = + Comparator.comparing(TableClusterGroupSpec::getClusteringValueIds, Ordering.<Integer>natural().lexicographical()); + + private final ClusteredValueGroupsBaseTableProjectionSpec spec; + private final RowSignature clusteringColumns; + private final VirtualColumns virtualColumns; + private final ColumnSelectorFactory virtualSelectorFactory; + private final IncrementalIndex.InputRowHolder inputRowHolder; + + // template state for instantiating new OnHeapClusterGroups + private final List<DimensionSchema> nonClusteringDimensions; + private final boolean rollup; + private final int timePosition; + + private final DimensionDictionary<String> stringDictionary = new StringDimensionDictionary(); + private final DimensionDictionary<Long> longDictionary = new DimensionDictionary<>(Long.class) + { + @Override + public long estimateSizeOfValue(Long value) + { + return Long.BYTES; + } + }; + private final DimensionDictionary<Double> doubleDictionary = new DimensionDictionary<>(Double.class) + { + @Override + public long estimateSizeOfValue(Double value) + { + return Double.BYTES; + } + }; + private final DimensionDictionary<Float> floatDictionary = new DimensionDictionary<>(Float.class) + { + @Override + public long estimateSizeOfValue(Float value) + { + return Float.BYTES; + } + }; + + // Keyed by the clustering-value dictionary-ID tuple + private final ConcurrentHashMap<List<Integer>, OnHeapClusterGroup> groups = new ConcurrentHashMap<>(); + private final AtomicInteger totalNumRows = new AtomicInteger(0); + // min/max bucketed row timestamp across all groups; the base facts holder is empty in clustered mode so the + // parent index's interval accessors delegate here instead. + private final AtomicLong minTimeMillis = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong maxTimeMillis = new AtomicLong(Long.MIN_VALUE); + + public OnHeapClusteredBaseTable( + ClusteredValueGroupsBaseTableProjectionSpec spec, + VirtualColumns segmentVirtualColumns, + IncrementalIndex.InputRowHolder inputRowHolder, + boolean rollup, + int timePosition + ) + { + this.spec = spec; + this.inputRowHolder = inputRowHolder; + this.rollup = rollup; + this.timePosition = timePosition; + this.nonClusteringDimensions = Collections.unmodifiableList( + new ArrayList<>(spec.getNonClusteringColumns()) + ); + + final RowSignature.Builder sigBuilder = RowSignature.builder(); + for (DimensionSchema c : spec.getClusteringColumns()) { + sigBuilder.add(c.getName(), c.getColumnType()); + } + this.clusteringColumns = sigBuilder.build(); + + this.virtualColumns = mergeVirtualColumns(segmentVirtualColumns, spec.getVirtualColumns()); + this.virtualSelectorFactory = new OnheapIncrementalIndex.CachingColumnSelectorFactory( + IncrementalIndex.makeColumnSelectorFactory(this.virtualColumns, inputRowHolder, null) + ); + } + + /** + * Resolve the clustering tuple for {@code row}, locate (or create) the matching {@link OnHeapClusterGroup}, and + * dispatch the row to it. {@code key} carries the bucketed timestamp from the parent's {@code toIncrementalIndexRow} + * pre-processing, its dim slots are ignored here since in clustered mode the non-clustering data lives only on the + * per-group facts holders. Returns true when the row created a new fact entry in its group (vs. rolling up into an + * existing one). + */ + boolean addToFacts( + IncrementalIndexRow key, + InputRow row, + List<String> parseExceptionMessages, + AtomicLong totalSizeInBytes + ) + { + final long clusteringDictSizeBefore = clusteringDictionariesSizeInBytes(); + final Object[] clusteringValues = new Object[clusteringColumns.size()]; + final List<Integer> clusteringValueIds = new ArrayList<>(clusteringColumns.size()); + for (int i = 0; i < clusteringColumns.size(); i++) { + final String name = clusteringColumns.getColumnName(i); + final ColumnType type = clusteringColumns.getColumnType(i) + .orElseThrow(() -> DruidException.defensive( + "clustering column [%s] has no type", + name + )); + final ColumnValueSelector<?> selector = virtualSelectorFactory.makeColumnValueSelector(name); + final Object raw = selector.getObject(); + Object coerced; + try { + coerced = DimensionHandlerUtils.convertObjectToType(raw, type, true, name); + } + catch (ParseException pe) { + parseExceptionMessages.add(pe.getMessage()); + coerced = null; + } + clusteringValues[i] = coerced; + clusteringValueIds.add(internClusteringValue(type, coerced)); + } + totalSizeInBytes.addAndGet(clusteringDictionariesSizeInBytes() - clusteringDictSizeBefore); + + final boolean[] groupCreated = {false}; + final OnHeapClusterGroup group = groups.computeIfAbsent( + clusteringValueIds, + ids -> { + groupCreated[0] = true; + return new OnHeapClusterGroup( + this, + clusteringValues, + ids, + nonClusteringDimensions, + virtualColumns, + inputRowHolder, + rollup, + timePosition + ); + } + ); + if (groupCreated[0]) { + totalSizeInBytes.addAndGet(estimateNewGroupOverhead()); + } + final boolean isNewEntry = group.addToFacts(row, key.getTimestamp(), parseExceptionMessages, totalSizeInBytes); + totalNumRows.incrementAndGet(); + minTimeMillis.accumulateAndGet(key.getTimestamp(), Math::min); + maxTimeMillis.accumulateAndGet(key.getTimestamp(), Math::max); + return isNewEntry; + } + + /** + * Rough running-heap cost of a newly-created cluster group beyond its rows: the {@link #groups}-map node, the boxed + * clustering-id tuple held as the map key, the per-group clustering-values array, and the group object's fixed + * structures. + */ + private long estimateNewGroupOverhead() + { + final int numClusteringColumns = clusteringColumns.size(); + return OnheapIncrementalIndex.ROUGH_OVERHEAD_PER_MAP_ENTRY + + (long) numClusteringColumns * (Long.BYTES * 2 + Long.BYTES) Review Comment: ## CodeQL / Result of multiplication cast to wider type Potential overflow in [int multiplication](1) before it is converted to long by use in a numeric context. [Show more details](https://github.com/apache/druid/security/code-scanning/11309) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
