aho135 commented on code in PR #19571:
URL: https://github.com/apache/druid/pull/19571#discussion_r3384708633


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -999,15 +1057,86 @@ private void checkPublishAndHandoffFailure() throws 
ExecutionException, Interrup
     handOffWaitList.removeAll(handoffFinished);
   }
 
+  @VisibleForTesting
+  void recordObservedDimensionValueForTest(String segmentId, String dimension, 
@Nullable String value)
+  {
+    observedDimensionValuesBySegment
+        .computeIfAbsent(segmentId, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(dimension, k -> Collections.synchronizedSet(new 
HashSet<>()))
+        .add(value);
+  }
+
+  @VisibleForTesting
+  void markSegmentRestartSpannedForTest(String segmentId)
+  {
+    restartSpannedSegments.add(segmentId);
+  }
+
+  /**
+   * Stamps a segment with a {@link StreamRangeShardSpec} declaring its 
observed dimension values so the broker can
+   * prune it, or returns it unchanged when pruning would be unsafe or 
pointless (see the guard clauses). A null
+   * observed value is carried through (distinct from {@code ""}) so {@code IS 
NULL} queries are not pruned.
+   */
+  @VisibleForTesting
+  DataSegment annotateSegmentWithPartitionFilters(DataSegment s)
+  {
+    final List<String> filterDims = ioConfig.getPartitionFilterDimensions();
+    if (CollectionUtils.isNullOrEmpty(filterDims)) {
+      return s;
+    }
+    final String lookupKey = 
SegmentIdWithShardSpec.fromDataSegment(s).toString();
+    if (restartSpannedSegments.contains(lookupKey)) {
+      return s;
+    }
+    final Map<String, Set<String>> segObserved = 
observedDimensionValuesBySegment.get(lookupKey);
+    if (segObserved == null || segObserved.isEmpty()) {
+      return s;
+    }
+    final Map<String, List<String>> snapshotFilters = new HashMap<>();
+    for (String dim : filterDims) {

Review Comment:
   Claude recommendation for race condition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to