abhishekrb19 commented on code in PR #19571:
URL: https://github.com/apache/druid/pull/19571#discussion_r3397609974
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -999,15 +1060,80 @@ private void checkPublishAndHandoffFailure() throws
ExecutionException, Interrup
handOffWaitList.removeAll(handoffFinished);
}
+ @VisibleForTesting
+ void recordObservedDimensionValueForTest(String segmentId, String dimension,
@Nullable String value)
+ {
+ observedPartitionDimValuesBySegment
+ .computeIfAbsent(segmentId, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(dimension, k -> Collections.synchronizedSet(new
HashSet<>()))
+ .add(value);
+ }
+
+ @VisibleForTesting
+ void markSegmentRestartSpannedForTest(String segmentId)
+ {
+ restartSpannedSegments.add(segmentId);
+ }
+
+ /**
+ * Stamps a segment with a {@link StreamRangeShardSpec} declaring its
observed dimension values so the broker can
+ * prune it. When the feature is on we always return a {@link
StreamRangeShardSpec}, falling back to an empty
+ * (non-pruning) filter map when values can't be safely declared, so
segments in an interval stay class-uniform for
+ * {@link
org.apache.druid.segment.realtime.appenderator.SegmentPublisherHelper}. A null
observed value is carried
+ * through (distinct from {@code ""}) so {@code IS NULL} queries are not
pruned.
+ */
+ @VisibleForTesting
+ DataSegment annotateSegmentWithPartitionDimensionValues(DataSegment s)
+ {
+ final List<String> partitionDimensions =
+
StreamingPartitionsSpec.getPartitionDimensionsOrEmpty(tuningConfig.getStreamingPartitionsSpec());
+ if (CollectionUtils.isNullOrEmpty(partitionDimensions)) {
+ return s;
+ }
+ final Map<String, List<String>> snapshotFilters = new HashMap<>();
+ final String lookupKey =
SegmentIdWithShardSpec.fromDataSegment(s).toString();
+ final Map<String, Set<String>> segObserved =
observedPartitionDimValuesBySegment.get(lookupKey);
+ // Leave filters empty for restart-spanned segments: their pre-restart
values can't be re-observed.
+ if (!restartSpannedSegments.contains(lookupKey) && segObserved != null) {
+ for (String dim : partitionDimensions) {
+ final Set<String> vals = segObserved.get(dim);
+ if (vals == null) {
+ continue;
+ }
+ // vals is a synchronized set written by the run loop; copy it under
its monitor to iterate safely.
+ final List<String> snapshot;
+ synchronized (vals) {
+ if (vals.isEmpty()) {
+ continue;
+ }
+ snapshot = new ArrayList<>(vals);
Review Comment:
Updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]