FrankChen021 commented on code in PR #19571:
URL: https://github.com/apache/druid/pull/19571#discussion_r3388204698
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -999,15 +1057,86 @@ private void checkPublishAndHandoffFailure() throws
ExecutionException, Interrup
handOffWaitList.removeAll(handoffFinished);
}
+ @VisibleForTesting
+ void recordObservedDimensionValueForTest(String segmentId, String dimension,
@Nullable String value)
+ {
+ observedDimensionValuesBySegment
+ .computeIfAbsent(segmentId, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(dimension, k -> Collections.synchronizedSet(new
HashSet<>()))
+ .add(value);
+ }
+
+ @VisibleForTesting
+ void markSegmentRestartSpannedForTest(String segmentId)
+ {
+ restartSpannedSegments.add(segmentId);
+ }
+
+ /**
+ * Stamps a segment with a {@link StreamRangeShardSpec} declaring its
observed dimension values so the broker can
+ * prune it, or returns it unchanged when pruning would be unsafe or
pointless (see the guard clauses). A null
+ * observed value is carried through (distinct from {@code ""}) so {@code IS
NULL} queries are not pruned.
+ */
+ @VisibleForTesting
+ DataSegment annotateSegmentWithPartitionFilters(DataSegment s)
+ {
+ final List<String> filterDims = ioConfig.getPartitionFilterDimensions();
+ if (CollectionUtils.isNullOrEmpty(filterDims)) {
+ return s;
+ }
+ final String lookupKey =
SegmentIdWithShardSpec.fromDataSegment(s).toString();
+ if (restartSpannedSegments.contains(lookupKey)) {
Review Comment:
P1 Mixed shard specs can fail publish after restart
Restart-spanned segments return unchanged as NumberedShardSpec, while new
same-interval segments in the same publish batch can be annotated as
StreamRangeShardSpec. TransactionalSegmentPublisher then runs
SegmentPublisherHelper.annotateShardSpec, which rejects mixed shard-spec
classes per interval, so a restarted task can fail publish/handoff. Make the
fallback interval-wide, or stamp restored segments with a non-pruning
stream_range spec.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -55,6 +57,62 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
private final String topic;
private final String topicPattern;
private final boolean emitTimeLagMetrics;
+ @Nullable
+ private final List<String> partitionFilterDimensions;
+
+ public KafkaSupervisorIOConfig(
+ String topic,
+ String topicPattern,
+ InputFormat inputFormat,
+ Integer replicas,
+ Integer taskCount,
+ Period taskDuration,
+ Map<String, Object> consumerProperties,
+ AutoScalerConfig autoScalerConfig,
+ LagAggregator lagAggregator,
+ Long pollTimeout,
+ Period startDelay,
+ Period period,
+ Boolean useEarliestOffset,
+ Period completionTimeout,
+ Period lateMessageRejectionPeriod,
+ Period earlyMessageRejectionPeriod,
+ DateTime lateMessageRejectionStartDateTime,
+ KafkaConfigOverrides configOverrides,
+ IdleConfig idleConfig,
+ Integer stopTaskCount,
+ Boolean emitTimeLagMetrics,
+ Map<Integer, Integer> serverPriorityToReplicas,
+ BoundedStreamConfig boundedStreamConfig
+ )
+ {
+ this(
+ topic,
+ topicPattern,
+ inputFormat,
+ replicas,
+ taskCount,
+ taskDuration,
+ consumerProperties,
+ autoScalerConfig,
+ lagAggregator,
+ pollTimeout,
+ startDelay,
+ period,
+ useEarliestOffset,
+ completionTimeout,
+ lateMessageRejectionPeriod,
+ earlyMessageRejectionPeriod,
+ lateMessageRejectionStartDateTime,
+ configOverrides,
+ idleConfig,
+ stopTaskCount,
+ emitTimeLagMetrics,
+ serverPriorityToReplicas,
+ boundedStreamConfig,
+ null
Review Comment:
P3 Backfill specs drop partitionFilterDimensions
This compatibility constructor always forwards null for
partitionFilterDimensions. KafkaSupervisorSpec.createBackfillSpec still uses
this overload when deriving bounded backfill specs, so a supervisor configured
with partitionFilterDimensions silently creates backfill tasks without the
pruning annotations. Pass the existing dimension list through for backfill
specs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]