aho135 commented on code in PR #19571:
URL: https://github.com/apache/druid/pull/19571#discussion_r3384705725
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -999,15 +1057,86 @@ private void checkPublishAndHandoffFailure() throws
ExecutionException, Interrup
handOffWaitList.removeAll(handoffFinished);
}
+ @VisibleForTesting
+ void recordObservedDimensionValueForTest(String segmentId, String dimension,
@Nullable String value)
+ {
+ observedDimensionValuesBySegment
+ .computeIfAbsent(segmentId, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(dimension, k -> Collections.synchronizedSet(new
HashSet<>()))
+ .add(value);
+ }
+
+ @VisibleForTesting
+ void markSegmentRestartSpannedForTest(String segmentId)
+ {
+ restartSpannedSegments.add(segmentId);
+ }
+
+ /**
+ * Stamps a segment with a {@link StreamRangeShardSpec} declaring its
observed dimension values so the broker can
+ * prune it, or returns it unchanged when pruning would be unsafe or
pointless (see the guard clauses). A null
+ * observed value is carried through (distinct from {@code ""}) so {@code IS
NULL} queries are not pruned.
+ */
+ @VisibleForTesting
+ DataSegment annotateSegmentWithPartitionFilters(DataSegment s)
+ {
+ final List<String> filterDims = ioConfig.getPartitionFilterDimensions();
+ if (CollectionUtils.isNullOrEmpty(filterDims)) {
+ return s;
+ }
+ final String lookupKey =
SegmentIdWithShardSpec.fromDataSegment(s).toString();
+ if (restartSpannedSegments.contains(lookupKey)) {
+ return s;
+ }
+ final Map<String, Set<String>> segObserved =
observedDimensionValuesBySegment.get(lookupKey);
+ if (segObserved == null || segObserved.isEmpty()) {
+ return s;
+ }
+ final Map<String, List<String>> snapshotFilters = new HashMap<>();
+ for (String dim : filterDims) {
Review Comment:
```
for (String dim : filterDims) {
segObserved.computeIfPresent(dim, (k, vals) -> {
synchronized (vals) {
if (!vals.isEmpty()) {
snapshotFilters.put(dim, new ArrayList<>(vals));
}
}
return vals; // Return unchanged - we're just reading
});
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]