This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a4d49211573 feat: MSQ storage counters. (#19316)
a4d49211573 is described below
commit a4d4921157348d2c344ab723a072a52d42260fec
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Apr 17 15:14:59 2026 -0700
feat: MSQ storage counters. (#19316)
This patch adds storage counters, providing information on the amount of
bytes and files written to local and durable storage, as well as the
current state of the local ByteTracker.
This patch also adds per-worker storage counters to the web console.
To make room for them, the per-channel counters are consolidated into a
single column, as they are in the stage summary row.
---
.../apache/druid/msq/counters/CounterNames.java | 10 +
.../apache/druid/msq/counters/CounterTracker.java | 12 +
.../apache/druid/msq/counters/StorageCounters.java | 232 +++++++++++++++++
.../org/apache/druid/msq/exec/RunWorkOrder.java | 47 ++--
.../druid/msq/exec/std/StandardStageRunner.java | 6 +-
.../apache/druid/msq/guice/MSQIndexingModule.java | 2 +
...va => ChannelCountingOutputChannelFactory.java} | 4 +-
...va => StorageCountingOutputChannelFactory.java} | 45 ++--
.../StorageCountingWritableFrameChannel.java | 102 ++++++++
.../druid/msq/counters/StorageCountersTest.java | 171 +++++++++++++
.../apache/druid/frame/channel/ByteTracker.java | 10 +
web-console/src/druid-models/stages/stages.mock.ts | 27 ++
web-console/src/druid-models/stages/stages.ts | 26 ++
.../execution-stages-pane.scss | 18 ++
.../execution-stages-pane.tsx | 275 ++++++++++++---------
15 files changed, 837 insertions(+), 150 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
index 19d5da36efb..2b7579deb72 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.counters;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.channel.ByteTracker;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.ordering.StringComparators;
@@ -37,6 +38,7 @@ public class CounterNames
private static final String CPU = "cpu";
private static final String SORT_PROGRESS = "sortProgress";
private static final String SEGMENT_GENERATION_PROGRESS =
"segmentGenerationProgress";
+ private static final String STORAGE = "storage";
private static final String WARNINGS = "warnings";
private static final Comparator<String> COMPARATOR = new NameComparator();
@@ -85,6 +87,14 @@ public class CounterNames
return SORT_PROGRESS;
}
+ /**
+ * Standard name for a storage counter created by {@link
CounterTracker#storage(ByteTracker)}.
+ */
+ public static String storage()
+ {
+ return STORAGE;
+ }
+
/**
* Standard name for a warnings counter created by {@link
CounterTracker#warnings()}.
*/
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
index dedc649215f..b626d024368 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
@@ -19,6 +19,8 @@
package org.apache.druid.msq.counters;
+import com.google.common.base.Preconditions;
+import org.apache.druid.frame.channel.ByteTracker;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.SuperSorterProgressTracker;
import org.apache.druid.frame.processor.manager.ProcessorManager;
@@ -101,6 +103,16 @@ public class CounterTracker
return counter(CounterNames.getSegmentGenerationProgress(),
SegmentGenerationProgressCounter::new);
}
+ public StorageCounters storage(final ByteTracker localByteTracker)
+ {
+ final StorageCounters storageCounters = counter(CounterNames.storage(), ()
-> new StorageCounters(localByteTracker));
+ Preconditions.checkState(
+ storageCounters.getLocalByteTracker() == localByteTracker,
+ "StorageCounters already exists with a different ByteTracker"
+ );
+ return storageCounters;
+ }
+
public WarningCounters warnings()
{
return counter(CounterNames.warnings(), WarningCounters::new);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/StorageCounters.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/StorageCounters.java
new file mode 100644
index 00000000000..f1acd142778
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/StorageCounters.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.frame.channel.ByteTracker;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Counters for storage usage during MSQ query execution. Created by {@link
CounterTracker#storage(ByteTracker)}.
+ *
+ * Tracks:
+ * <ul>
+ * <li>Local byte tracker max and reserved size</li>
+ * <li>Total files and bytes written to local storage</li>
+ * <li>Total files and bytes written to durable storage</li>
+ * </ul>
+ */
+public class StorageCounters implements QueryCounter
+{
+ @Nullable
+ private final ByteTracker localByteTracker;
+
+ private final AtomicLong localFilesWritten = new AtomicLong();
+ private final AtomicLong localBytesWritten = new AtomicLong();
+ private final AtomicLong durableFileCount = new AtomicLong();
+ private final AtomicLong durableBytesWritten = new AtomicLong();
+
+ public StorageCounters(@Nullable final ByteTracker localByteTracker)
+ {
+ this.localByteTracker = localByteTracker;
+ }
+
+ @Nullable
+ public ByteTracker getLocalByteTracker()
+ {
+ return localByteTracker;
+ }
+
+ /**
+ * Increments the local storage file counter by one.
+ */
+ public void incrementLocalFiles()
+ {
+ localFilesWritten.incrementAndGet();
+ }
+
+ /**
+ * Adds to the local storage bytes-written counter.
+ */
+ public void incrementLocalBytes(final long bytes)
+ {
+ localBytesWritten.addAndGet(bytes);
+ }
+
+ /**
+ * Increments the durable storage file counter by one.
+ */
+ public void incrementDurableFiles()
+ {
+ durableFileCount.incrementAndGet();
+ }
+
+ /**
+ * Adds to the durable storage bytes-written counter.
+ */
+ public void incrementDurableBytes(final long bytes)
+ {
+ durableBytesWritten.addAndGet(bytes);
+ }
+
+ @Override
+ @Nullable
+ public QueryCounterSnapshot snapshot()
+ {
+ final Long localBytesMax;
+ final long localBytesReserved;
+
+ if (localByteTracker != null) {
+ final long maxBytes = localByteTracker.getMaxBytes();
+ localBytesMax = maxBytes == Long.MAX_VALUE ? null : maxBytes;
+ localBytesReserved = localByteTracker.getCurrentBytes();
+ } else {
+ localBytesMax = null;
+ localBytesReserved = 0;
+ }
+
+ return new Snapshot(
+ localBytesMax,
+ localBytesReserved,
+ localFilesWritten.get(),
+ localBytesWritten.get(),
+ durableFileCount.get(),
+ durableBytesWritten.get()
+ );
+ }
+
+ @JsonTypeName("storage")
+ public static class Snapshot implements QueryCounterSnapshot
+ {
+ @Nullable
+ private final Long localBytesMax;
+ private final long localBytesReserved;
+ private final long localFilesWritten;
+ private final long localBytesWritten;
+ private final long durableFileCount;
+ private final long durableBytesWritten;
+
+ @JsonCreator
+ public Snapshot(
+ @JsonProperty("localBytesMax") @Nullable final Long localBytesMax,
+ @JsonProperty("localBytesReserved") final long localBytesReserved,
+ @JsonProperty("localFilesWritten") final long localFilesWritten,
+ @JsonProperty("localBytesWritten") final long localBytesWritten,
+ @JsonProperty("durableFileCount") final long durableFileCount,
+ @JsonProperty("durableBytesWritten") final long durableBytesWritten
+ )
+ {
+ this.localBytesMax = localBytesMax;
+ this.localBytesReserved = localBytesReserved;
+ this.localFilesWritten = localFilesWritten;
+ this.localBytesWritten = localBytesWritten;
+ this.durableFileCount = durableFileCount;
+ this.durableBytesWritten = durableBytesWritten;
+ }
+
+ @Nullable
+ @JsonProperty("localBytesMax")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Long getLocalBytesMax()
+ {
+ return localBytesMax;
+ }
+
+ @JsonProperty("localBytesReserved")
+ public long getLocalBytesReserved()
+ {
+ return localBytesReserved;
+ }
+
+ @JsonProperty("localFilesWritten")
+ public long getLocalFilesWritten()
+ {
+ return localFilesWritten;
+ }
+
+ @JsonProperty("localBytesWritten")
+ public long getLocalBytesWritten()
+ {
+ return localBytesWritten;
+ }
+
+ @JsonProperty("durableFileCount")
+ public long getDurableFileCount()
+ {
+ return durableFileCount;
+ }
+
+ @JsonProperty("durableBytesWritten")
+ public long getDurableBytesWritten()
+ {
+ return durableBytesWritten;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Snapshot snapshot = (Snapshot) o;
+ return Objects.equals(localBytesMax, snapshot.localBytesMax)
+ && localBytesReserved == snapshot.localBytesReserved
+ && localFilesWritten == snapshot.localFilesWritten
+ && localBytesWritten == snapshot.localBytesWritten
+ && durableFileCount == snapshot.durableFileCount
+ && durableBytesWritten == snapshot.durableBytesWritten;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ localBytesMax,
+ localBytesReserved,
+ localFilesWritten,
+ localBytesWritten,
+ durableFileCount,
+ durableBytesWritten
+ );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Snapshot{" +
+ "localBytesMax=" + localBytesMax +
+ ", localBytesReserved=" + localBytesReserved +
+ ", localFilesWritten=" + localFilesWritten +
+ ", localBytesWritten=" + localBytesWritten +
+ ", durableFileCount=" + durableFileCount +
+ ", durableBytesWritten=" + durableBytesWritten +
+ '}';
+ }
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index f73ffa010fc..ba613768781 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.msq.counters.CounterTracker;
+import org.apache.druid.msq.indexing.StorageCountingOutputChannelFactory;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.input.InputSlice;
@@ -431,20 +432,28 @@ public class RunWorkOrder
case LOCAL_STORAGE:
final File fileChannelDirectory =
frameContext.tempDir(StringUtils.format("output_stage_%06d",
workOrder.getStageNumber()));
- outputChannelFactory = new FileOutputChannelFactory(
- fileChannelDirectory,
- frameSize,
- null,
- frameContext.wireTransferableContext()
+ outputChannelFactory = new StorageCountingOutputChannelFactory(
+ new FileOutputChannelFactory(
+ fileChannelDirectory,
+ frameSize,
+ null,
+ frameContext.wireTransferableContext()
+ ),
+ counterTracker.storage(intermediateByteTracker),
+ false
);
break;
case DURABLE_STORAGE_INTERMEDIATE:
case DURABLE_STORAGE_QUERY_RESULTS:
- outputChannelFactory = makeDurableStorageOutputChannelFactory(
- frameContext.tempDir("durable"),
- frameSize,
- outputChannelMode ==
OutputChannelMode.DURABLE_STORAGE_QUERY_RESULTS
+ outputChannelFactory = new StorageCountingOutputChannelFactory(
+ makeDurableStorageOutputChannelFactory(
+ frameContext.tempDir("durable"),
+ frameSize,
+ outputChannelMode ==
OutputChannelMode.DURABLE_STORAGE_QUERY_RESULTS
+ ),
+ counterTracker.storage(intermediateByteTracker),
+ true
);
break;
@@ -469,11 +478,15 @@ public class RunWorkOrder
final int frameSize = frameContext.memoryParameters().getFrameSize();
final File fileChannelDirectory =
new File(tempDir, StringUtils.format("intermediate-stage-%06d",
workOrder.getStageNumber()));
- final FileOutputChannelFactory fileOutputChannelFactory = new
FileOutputChannelFactory(
- fileChannelDirectory,
- frameSize,
- intermediateByteTracker,
- frameContext.wireTransferableContext()
+ final OutputChannelFactory fileOutputChannelFactory = new
StorageCountingOutputChannelFactory(
+ new FileOutputChannelFactory(
+ fileChannelDirectory,
+ frameSize,
+ intermediateByteTracker,
+ frameContext.wireTransferableContext()
+ ),
+ counterTracker.storage(intermediateByteTracker),
+ false
);
if (workOrder.getOutputChannelMode().isDurable()
@@ -483,7 +496,11 @@ public class RunWorkOrder
return new ComposingOutputChannelFactory(
ImmutableList.of(
fileOutputChannelFactory,
- makeDurableStorageOutputChannelFactory(tempDir, frameSize,
isQueryResults)
+ new StorageCountingOutputChannelFactory(
+ makeDurableStorageOutputChannelFactory(tempDir, frameSize,
isQueryResults),
+ counterTracker.storage(intermediateByteTracker),
+ true
+ )
),
frameSize
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
index 4f2024bd8df..fd7da3c88cd 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
@@ -33,7 +33,7 @@ import org.apache.druid.msq.counters.CpuCounters;
import org.apache.druid.msq.exec.ExecutionContext;
import org.apache.druid.msq.exec.FrameContext;
import org.apache.druid.msq.exec.StageProcessor;
-import org.apache.druid.msq.indexing.CountingOutputChannelFactory;
+import org.apache.druid.msq.indexing.ChannelCountingOutputChannelFactory;
import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -128,7 +128,7 @@ public class StandardStageRunner<T, R>
baseOutputChannelFactory = executionContext.outputChannelFactory();
}
- workOutputChannelFactory = new CountingOutputChannelFactory(
+ workOutputChannelFactory = new ChannelCountingOutputChannelFactory(
baseOutputChannelFactory,
executionContext.counters().channel(CounterNames.outputChannel())
);
@@ -181,7 +181,7 @@ public class StandardStageRunner<T, R>
pipelineFuture =
stageOperations.gatherResultKeyStatisticsIfNeeded(pipelineFuture);
- final OutputChannelFactory stageOutputChannelFactory = new
CountingOutputChannelFactory(
+ final OutputChannelFactory stageOutputChannelFactory = new
ChannelCountingOutputChannelFactory(
executionContext.outputChannelFactory(),
executionContext.counters().channel(CounterNames.shuffleChannel())
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
index 6b100e078b4..629754f2a9a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
@@ -36,6 +36,7 @@ import org.apache.druid.msq.counters.CpuCounter;
import org.apache.druid.msq.counters.CpuCounters;
import org.apache.druid.msq.counters.NilQueryCounterSnapshot;
import org.apache.druid.msq.counters.SegmentGenerationProgressCounter;
+import org.apache.druid.msq.counters.StorageCounters;
import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter;
import org.apache.druid.msq.counters.WarningCounters;
import org.apache.druid.msq.indexing.IndexerControllerContextFactory;
@@ -198,6 +199,7 @@ public class MSQIndexingModule implements DruidModule
SuperSorterProgressTrackerCounter.Snapshot.class,
WarningCounters.Snapshot.class,
SegmentGenerationProgressCounter.Snapshot.class,
+ StorageCounters.Snapshot.class,
CpuCounters.Snapshot.class,
CpuCounter.Snapshot.class,
NilQueryCounterSnapshot.class,
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ChannelCountingOutputChannelFactory.java
similarity index 95%
copy from
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
copy to
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ChannelCountingOutputChannelFactory.java
index 667d99cc198..5dc0e6cf9ec 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ChannelCountingOutputChannelFactory.java
@@ -27,12 +27,12 @@ import org.apache.druid.msq.counters.ChannelCounters;
import java.io.IOException;
-public class CountingOutputChannelFactory implements OutputChannelFactory
+public class ChannelCountingOutputChannelFactory implements
OutputChannelFactory
{
private final OutputChannelFactory baseFactory;
private final ChannelCounters channelCounters;
- public CountingOutputChannelFactory(
+ public ChannelCountingOutputChannelFactory(
final OutputChannelFactory baseFactory,
final ChannelCounters channelCounters
)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingOutputChannelFactory.java
similarity index 59%
rename from
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
rename to
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingOutputChannelFactory.java
index 667d99cc198..6d0c770f2d9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingOutputChannelFactory.java
@@ -23,50 +23,61 @@ import com.google.common.base.Preconditions;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.frame.processor.PartitionedOutputChannel;
-import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.counters.StorageCounters;
import java.io.IOException;
-public class CountingOutputChannelFactory implements OutputChannelFactory
+/**
+ * Wraps an {@link OutputChannelFactory} to track storage file and byte counts
in {@link StorageCounters}.
+ * Similar to {@link ChannelCountingOutputChannelFactory} but for
storage-level tracking rather than
+ * per-partition channel tracking.
+ */
+public class StorageCountingOutputChannelFactory implements
OutputChannelFactory
{
private final OutputChannelFactory baseFactory;
- private final ChannelCounters channelCounters;
+ private final StorageCounters storageCounters;
+ private final boolean isDurable;
- public CountingOutputChannelFactory(
+ public StorageCountingOutputChannelFactory(
final OutputChannelFactory baseFactory,
- final ChannelCounters channelCounters
+ final StorageCounters storageCounters,
+ final boolean isDurable
)
{
this.baseFactory = Preconditions.checkNotNull(baseFactory, "baseFactory");
- this.channelCounters = Preconditions.checkNotNull(channelCounters,
"channelCounter");
+ this.storageCounters = Preconditions.checkNotNull(storageCounters,
"storageCounters");
+ this.isDurable = isDurable;
}
@Override
- public OutputChannel openChannel(int partitionNumber) throws IOException
+ public OutputChannel openChannel(final int partitionNumber) throws
IOException
{
final OutputChannel baseChannel = baseFactory.openChannel(partitionNumber);
return baseChannel.mapWritableChannel(
baseWritableChannel ->
- new CountingWritableFrameChannel(
- baseChannel.getWritableChannel(),
- channelCounters,
- partitionNumber
+ new StorageCountingWritableFrameChannel(
+ baseWritableChannel,
+ storageCounters,
+ isDurable
)
);
}
@Override
- public PartitionedOutputChannel openPartitionedChannel(String name, boolean
deleteAfterRead) throws IOException
+ public PartitionedOutputChannel openPartitionedChannel(
+ final String name,
+ final boolean deleteAfterRead
+ ) throws IOException
{
final PartitionedOutputChannel baseChannel =
baseFactory.openPartitionedChannel(name, deleteAfterRead);
return baseChannel.mapWritableChannel(
baseWritableChannel ->
- new CountingWritableFrameChannel(
- baseChannel.getWritableChannel(),
- channelCounters,
- null
+ new StorageCountingWritableFrameChannel(
+ baseWritableChannel,
+ storageCounters,
+ isDurable
)
);
}
@@ -74,7 +85,7 @@ public class CountingOutputChannelFactory implements
OutputChannelFactory
@Override
public OutputChannel openNilChannel(final int partitionNumber)
{
- // No need for counters on nil channels: they never receive input.
+ // No need for storage counters on nil channels: they never receive input.
return baseFactory.openNilChannel(partitionNumber);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingWritableFrameChannel.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingWritableFrameChannel.java
new file mode 100644
index 00000000000..15c5ef17fde
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingWritableFrameChannel.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.msq.counters.StorageCounters;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * Wraps a {@link WritableFrameChannel} to track storage usage in {@link
StorageCounters}.
+ * Updates counters on each {@link #write} call: the first write increments
the file count by one,
+ * and every write increments bytes written.
+ */
+public class StorageCountingWritableFrameChannel implements
WritableFrameChannel
+{
+ private final WritableFrameChannel baseChannel;
+ private final StorageCounters storageCounters;
+ private final boolean isDurable;
+ private boolean firstWrite = true;
+
+ public StorageCountingWritableFrameChannel(
+ final WritableFrameChannel baseChannel,
+ final StorageCounters storageCounters,
+ final boolean isDurable
+ )
+ {
+ this.baseChannel = baseChannel;
+ this.storageCounters = storageCounters;
+ this.isDurable = isDurable;
+ }
+
+ @Override
+ public void write(final RowsAndColumns rac, final int partitionNum) throws
IOException
+ {
+ baseChannel.write(rac, partitionNum);
+
+ if (firstWrite) {
+ firstWrite = false;
+ if (isDurable) {
+ storageCounters.incrementDurableFiles();
+ } else {
+ storageCounters.incrementLocalFiles();
+ }
+ }
+
+ if (rac instanceof FrameRowsAndColumns) {
+ final Frame frame = ((FrameRowsAndColumns) rac).getFrame();
+ if (isDurable) {
+ storageCounters.incrementDurableBytes(frame.numBytes());
+ } else {
+ storageCounters.incrementLocalBytes(frame.numBytes());
+ }
+ }
+ }
+
+ @Override
+ public void fail(@Nullable final Throwable cause) throws IOException
+ {
+ baseChannel.fail(cause);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ baseChannel.close();
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return baseChannel.isClosed();
+ }
+
+ @Override
+ public ListenableFuture<?> writabilityFuture()
+ {
+ return baseChannel.writabilityFuture();
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/counters/StorageCountersTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/StorageCountersTest.java
new file mode 100644
index 00000000000..f0e449f35e7
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/StorageCountersTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.frame.channel.ByteTracker;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StorageCountersTest
+{
+ @Test
+ public void testSnapshotWithNoByteTracker()
+ {
+ final StorageCounters counters = new StorageCounters(null);
+ final StorageCounters.Snapshot snapshot = (StorageCounters.Snapshot)
counters.snapshot();
+
+ Assert.assertNull(snapshot.getLocalBytesMax());
+ Assert.assertEquals(0, snapshot.getLocalBytesReserved());
+ Assert.assertEquals(0, snapshot.getLocalFilesWritten());
+ Assert.assertEquals(0, snapshot.getLocalBytesWritten());
+ Assert.assertEquals(0, snapshot.getDurableFileCount());
+ Assert.assertEquals(0, snapshot.getDurableBytesWritten());
+ }
+
+ @Test
+ public void testSnapshotWithByteTracker()
+ {
+ final ByteTracker tracker = new ByteTracker(1000);
+ tracker.reserve(300);
+
+ final StorageCounters counters = new StorageCounters(tracker);
+
+ final StorageCounters.Snapshot snapshot = (StorageCounters.Snapshot)
counters.snapshot();
+ Assert.assertEquals(1000L, (long) snapshot.getLocalBytesMax());
+ Assert.assertEquals(300, snapshot.getLocalBytesReserved());
+ }
+
+ @Test
+ public void testSnapshotWithByteTrackerReflectsCurrentState()
+ {
+ final ByteTracker tracker = new ByteTracker(1000);
+ tracker.reserve(300);
+
+ final StorageCounters counters = new StorageCounters(tracker);
+
+ // Take first snapshot
+ final StorageCounters.Snapshot snapshot1 = (StorageCounters.Snapshot)
counters.snapshot();
+ Assert.assertEquals(300, snapshot1.getLocalBytesReserved());
+
+ // Change tracker state
+ tracker.reserve(200);
+
+ // Second snapshot reflects new state
+ final StorageCounters.Snapshot snapshot2 = (StorageCounters.Snapshot)
counters.snapshot();
+ Assert.assertEquals(500, snapshot2.getLocalBytesReserved());
+ }
+
+ @Test
+ public void testIncrementLocalFiles()
+ {
+ final StorageCounters counters = new StorageCounters(null);
+ counters.incrementLocalFiles();
+ counters.incrementLocalBytes(500);
+ counters.incrementLocalFiles();
+ counters.incrementLocalBytes(300);
+
+ final StorageCounters.Snapshot snapshot = (StorageCounters.Snapshot)
counters.snapshot();
+ Assert.assertEquals(2, snapshot.getLocalFilesWritten());
+ Assert.assertEquals(800, snapshot.getLocalBytesWritten());
+ Assert.assertEquals(0, snapshot.getDurableFileCount());
+ Assert.assertEquals(0, snapshot.getDurableBytesWritten());
+ }
+
+ @Test
+ public void testIncrementDurableFiles()
+ {
+ final StorageCounters counters = new StorageCounters(null);
+ counters.incrementDurableFiles();
+ counters.incrementDurableBytes(1000);
+ counters.incrementDurableFiles();
+ counters.incrementDurableBytes(2000);
+
+ final StorageCounters.Snapshot snapshot = (StorageCounters.Snapshot)
counters.snapshot();
+ Assert.assertEquals(0, snapshot.getLocalFilesWritten());
+ Assert.assertEquals(0, snapshot.getLocalBytesWritten());
+ Assert.assertEquals(2, snapshot.getDurableFileCount());
+ Assert.assertEquals(3000, snapshot.getDurableBytesWritten());
+ }
+
+ @Test
+ public void testSnapshotSerde() throws Exception
+ {
+ final ObjectMapper mapper =
+ TestHelper.makeJsonMapper().registerModules(new
MSQIndexingModule().getJacksonModules());
+
+ final StorageCounters.Snapshot snapshot = new
StorageCounters.Snapshot(1000L, 300, 5, 2500, 3, 1500);
+
+ final String json = mapper.writeValueAsString(snapshot);
+ final StorageCounters.Snapshot deserialized = mapper.readValue(json,
StorageCounters.Snapshot.class);
+
+ Assert.assertEquals(snapshot, deserialized);
+ }
+
+ @Test
+ public void testSnapshotSerdeViaCounterSnapshots() throws Exception
+ {
+ final ObjectMapper mapper =
+ TestHelper.makeJsonMapper().registerModules(new
MSQIndexingModule().getJacksonModules());
+
+ final StorageCounters.Snapshot snapshot = new
StorageCounters.Snapshot(1000L, 300, 5, 2500, 3, 1500);
+ final CounterSnapshotsTree tree = new CounterSnapshotsTree();
+ tree.put(0, 0, new
CounterSnapshots(ImmutableMap.of(CounterNames.storage(), snapshot)));
+
+ final String json = mapper.writeValueAsString(tree);
+ final CounterSnapshotsTree deserialized = mapper.readValue(json,
CounterSnapshotsTree.class);
+
+ Assert.assertEquals(
+ snapshot,
+
deserialized.copyMap().get(0).get(0).getMap().get(CounterNames.storage())
+ );
+ }
+
+ @Test
+ public void testCounterTrackerDefensiveCheck()
+ {
+ final ByteTracker tracker1 = new ByteTracker(1000);
+ final ByteTracker tracker2 = new ByteTracker(2000);
+
+ final CounterTracker counterTracker = new CounterTracker(true);
+
+ // First call creates the counter
+ final StorageCounters counters = counterTracker.storage(tracker1);
+ Assert.assertSame(tracker1, counters.getLocalByteTracker());
+
+ // Second call with same tracker succeeds
+ Assert.assertSame(counters, counterTracker.storage(tracker1));
+
+ // Call with different tracker fails
+ Assert.assertThrows(IllegalStateException.class, () ->
counterTracker.storage(tracker2));
+ }
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(StorageCounters.Snapshot.class)
+ .usingGetClass()
+ .verify();
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java
b/processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java
index 9dbe2e024bb..092da7cc07b 100644
--- a/processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java
+++ b/processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java
@@ -71,6 +71,16 @@ public class ByteTracker
currentBytes = Math.subtractExact(currentBytes, byteCount);
}
+ public synchronized long getMaxBytes()
+ {
+ return maxBytes;
+ }
+
+ public synchronized long getCurrentBytes()
+ {
+ return currentBytes;
+ }
+
public static ByteTracker unboundedTracker()
{
return new ByteTracker(Long.MAX_VALUE);
diff --git a/web-console/src/druid-models/stages/stages.mock.ts
b/web-console/src/druid-models/stages/stages.mock.ts
index c10c5794571..9820ac315f5 100644
--- a/web-console/src/druid-models/stages/stages.mock.ts
+++ b/web-console/src/druid-models/stages/stages.mock.ts
@@ -1144,6 +1144,15 @@ export const STAGES = new Stages(
totalMergersForUltimateLevel: 1,
progressDigest: 1,
},
+ storage: {
+ type: 'storage',
+ localBytesMax: 1073741824,
+ localBytesReserved: 5242880,
+ localFilesWritten: 2,
+ localBytesWritten: 4120000,
+ durableFileCount: 1,
+ durableBytesWritten: 3900000,
+ },
},
},
'1': {
@@ -1189,6 +1198,15 @@ export const STAGES = new Stages(
totalMergersForUltimateLevel: 1,
progressDigest: 1,
},
+ storage: {
+ type: 'storage',
+ localBytesMax: 1073741824,
+ localBytesReserved: 52428800,
+ localFilesWritten: 24,
+ localBytesWritten: 48234567,
+ durableFileCount: 12,
+ durableBytesWritten: 35123456,
+ },
},
},
'2': {
@@ -1234,6 +1252,15 @@ export const STAGES = new Stages(
totalMergersForUltimateLevel: 24,
progressDigest: 1,
},
+ storage: {
+ type: 'storage',
+ localBytesMax: 1073741824,
+ localBytesReserved: 104857600,
+ localFilesWritten: 48,
+ localBytesWritten: 95432100,
+ durableFileCount: 24,
+ durableBytesWritten: 72345678,
+ },
},
},
'3': {
diff --git a/web-console/src/druid-models/stages/stages.ts
b/web-console/src/druid-models/stages/stages.ts
index 8ad539a109f..07014cfd793 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -147,6 +147,7 @@ export interface StageWorkerCounter {
segmentGenerationProgress?: SegmentGenerationProgressCounter;
warnings?: WarningCounter;
cpu?: CpusCounter;
+ storage?: StorageCounter;
}
export type ChannelCounterName = `input${number}` | 'output' | 'shuffle';
@@ -301,6 +302,29 @@ export interface CpuCounter {
wall: number;
}
+export interface StorageCounter {
+ type: 'storage';
+ localBytesMax?: number;
+ localBytesReserved: number;
+ localFilesWritten: number;
+ localBytesWritten: number;
+ durableFileCount: number;
+ durableBytesWritten: number;
+}
+
+function normalizeStorageCounter(s: StorageCounter | undefined):
StorageCounter | undefined {
+ if (!s) return;
+ return {
+ type: 'storage',
+ localBytesMax: s.localBytesMax != null ? Number(s.localBytesMax) :
undefined,
+ localBytesReserved: Number(s.localBytesReserved),
+ localFilesWritten: Number(s.localFilesWritten),
+ localBytesWritten: Number(s.localBytesWritten),
+ durableFileCount: Number(s.durableFileCount),
+ durableBytesWritten: Number(s.durableBytesWritten),
+ };
+}
+
function sumCpuCounters(cs: CpuCounter[]): CpuCounter {
return aggregateThings(cs, {
type: 'cpu',
@@ -316,6 +340,7 @@ export interface SimpleWideCounter {
shuffle?: Record<ChannelFields, number>;
segmentGenerationProgress?: SegmentGenerationProgressCounter;
cpu?: CpusCounter;
+ storage?: StorageCounter;
}
function zeroChannelFields(): Record<ChannelFields, number> {
@@ -640,6 +665,7 @@ export class Stages {
}
newWideCounter.segmentGenerationProgress =
stageCounters.segmentGenerationProgress;
newWideCounter.cpu = stageCounters.cpu;
+ newWideCounter.storage = normalizeStorageCounter(stageCounters.storage);
return newWideCounter;
});
}
diff --git
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.scss
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.scss
index c0a0553fdad..a64e9062d8d 100644
---
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.scss
+++
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.scss
@@ -144,6 +144,24 @@
width: 80px;
}
+ .rows-label {
+ display: inline-block;
+ width: 140px;
+ overflow: hidden;
+ text-overflow: ellipsis;
+ white-space: nowrap;
+ vertical-align: bottom;
+ }
+
+ .storage-label {
+ display: inline-block;
+ width: 55px;
+ }
+
+ .storage-used {
+ opacity: 0.7;
+ }
+
.timing-value {
position: relative;
height: 100%;
diff --git
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
index c6fe9a73574..4b74879f625 100644
---
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
+++
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
@@ -222,30 +222,25 @@ export const ExecutionStagesPane = React.memo(function
ExecutionStagesPane(
const counterNames: ChannelCounterName[] =
stages.getChannelCounterNamesForStage(stage);
- const bracesRows: Record<ChannelCounterName, string[]> = {} as any;
- const bracesExtra: Record<ChannelCounterName, string[]> = {} as any;
- for (const counterName of counterNames) {
- bracesRows[counterName] = wideCounters.map(wideCounter =>
- formatRows(wideCounter[counterName]!.rows),
- );
- bracesExtra[counterName] = filterMap(wideCounters, wideCounter => {
- const totalFiles = wideCounter[counterName]!.totalFiles;
- if (!totalFiles) return;
- return formatFileOfTotalForBrace(totalFiles, totalFiles);
- });
- }
-
const isSegmentGenerator = Stages.stageType(stage) === 'segmentGenerator';
- let bracesSegmentRowsMerged: string[] = [];
- let bracesSegmentRowsPushed: string[] = [];
+
+ // Unified braces for the combined rows column
+ const allBracesRows: string[] = counterNames.flatMap(counterName =>
+ wideCounters.map(wideCounter =>
formatRows(wideCounter[counterName]!.rows)),
+ );
if (isSegmentGenerator) {
- bracesSegmentRowsMerged = wideCounters.map(wideCounter =>
- formatRows(wideCounter.segmentGenerationProgress?.rowsMerged || 0),
- );
- bracesSegmentRowsPushed = wideCounters.map(wideCounter =>
- formatRows(wideCounter.segmentGenerationProgress?.rowsPushed || 0),
+ allBracesRows.push(
+ ...wideCounters.map(wc =>
formatRows(wc.segmentGenerationProgress?.rowsMerged || 0)),
+ ...wideCounters.map(wc =>
formatRows(wc.segmentGenerationProgress?.rowsPushed || 0)),
);
}
+ const allBracesFiles: string[] = filterMap(
+ counterNames.flatMap(counterName =>
+ wideCounters.map(wideCounter => wideCounter[counterName]!),
+ ),
+ c => (c.totalFiles ? formatFileOfTotalForBrace(c.totalFiles,
c.totalFiles) : undefined),
+ );
+ const firstNonInputIndex = counterNames.findIndex(cn =>
!cn.startsWith('input'));
return (
<ReactTable
@@ -327,103 +322,157 @@ export const ExecutionStagesPane = React.memo(function
ExecutionStagesPane(
);
},
} as Column<SimpleWideCounter>,
- ].concat(
- counterNames.map((counterName, i) => {
- const isInput = counterName.startsWith('input');
- return {
- Header: twoLines(
- isInput ? (
- <span>{inputLabelContent(stage, i)}</span>
- ) : (
- stages.getStageCounterTitle(stage, counterName)
- ),
- isInput ? <i>rows (input files)</i> : <i>rows</i>,
- ),
- id: counterName,
- accessor: d => d[counterName]!.rows,
- className: 'padded',
- width: 200,
- Cell({ value, original }) {
- const c = (original as SimpleWideCounter)[counterName]!;
- return (
- <>
- <BracedText
- text={formatRows(value)}
- braces={bracesRows[counterName]}
- data-tooltip={
- c.bytes
- ? `Uncompressed size: ${formatBytesCompact(c.bytes)}
${NOT_SIZE_ON_DISK}`
- : NO_SIZE_INFO
- }
- />
- {Boolean(c.totalFiles) && (
- <>
- {' '}
- {' '}
- <BracedText
- text={formatFileOfTotal(c.files, c.totalFiles)}
- braces={bracesExtra[counterName]}
- />
- </>
- )}
- {Boolean(c.loadFiles) && (
- <>
- {' '}
- {' '}
- <Icon
- className="load-indicator"
- icon={IconNames.IMPORT}
- data-tooltip={formatLoadTooltip(
- c.loadFiles,
- c.loadBytes,
- c.loadTime,
- c.loadWait,
+ ].concat([
+ {
+ Header: twoLines('Rows processed', <i>rows (input
files)</i>),
+ id: 'rows_processed',
+ accessor: (d: SimpleWideCounter) =>
+ counterNames.reduce((acc, cn) => acc + (d[cn]?.rows || 0), 0),
+ className: 'padded',
+ width: 300,
+ Cell({ original }: { original: SimpleWideCounter }) {
+ return (
+ <>
+ {counterNames.map((counterName, idx) => {
+ const c = original[counterName]!;
+ const isInput = counterName.startsWith('input');
+ const inputIndex = isInput ?
Number(counterName.replace('input', '')) : -1;
+ const showSpacer = idx === firstNonInputIndex &&
firstNonInputIndex > 0;
+ const label = isInput
+ ? formatInputLabel(stage, inputIndex)
+ : stages.getStageCounterTitle(stage, counterName);
+ const tooltipParts: string[] = [];
+ if (c.bytes) {
+ tooltipParts.push(
+ `Uncompressed size: ${formatBytesCompact(c.bytes)}
${NOT_SIZE_ON_DISK}`,
+ );
+ }
+ if (c.loadFiles) {
+ tooltipParts.push(
+ formatLoadTooltip(c.loadFiles, c.loadBytes,
c.loadTime, c.loadWait),
+ );
+ }
+ if (c.queries || c.totalQueries) {
+ tooltipParts.push(
+ `Realtime queries: ${formatInteger(c.queries || 0)} /
${formatInteger(
+ c.totalQueries || 0,
+ )}`,
+ );
+ }
+ return (
+ <React.Fragment key={counterName}>
+ {showSpacer && <div className="counter-spacer
extend-right" />}
+ <div
+ data-tooltip={
+ tooltipParts.length ? tooltipParts.join('\n') :
NO_SIZE_INFO
+ }
+ >
+ <span className="rows-label">{label}</span>
+ <BracedText text={formatRows(c.rows)}
braces={allBracesRows} />
+ {Boolean(c.totalFiles) && (
+ <>
+ {' '}
+ {' '}
+ <BracedText
+ text={formatFileOfTotal(c.files, c.totalFiles)}
+ braces={allBracesFiles}
+ />
+ </>
)}
+ </div>
+ </React.Fragment>
+ );
+ })}
+ {isSegmentGenerator && (
+ <>
+ <div className="counter-spacer extend-right" />
+ <div>
+ <span className="rows-label">Merged</span>
+ <BracedText
+
text={formatRows(original.segmentGenerationProgress?.rowsMerged || 0)}
+ braces={allBracesRows}
/>
- </>
- )}
- {Boolean(c.queries || c.totalQueries) && (
- <>
- {' '}
- {' '}
- <Icon
- icon={IconNames.ARROW_BOTTOM_LEFT}
- data-tooltip={`Realtime queries (${formatInteger(
- c.queries || 0,
- )} / ${formatInteger(c.totalQueries || 0)})`}
+ </div>
+ <div>
+ <span className="rows-label">Pushed</span>
+ <BracedText
+
text={formatRows(original.segmentGenerationProgress?.rowsPushed || 0)}
+ braces={allBracesRows}
/>
- </>
- )}
- </>
+ </div>
+ </>
+ )}
+ </>
+ );
+ },
+ } as Column<SimpleWideCounter>,
+ {
+ Header: 'Storage utilization',
+ id: 'storage',
+ accessor: (d: SimpleWideCounter) => {
+ const s = d.storage;
+ if (!s) return 0;
+ return s.localBytesWritten + s.durableBytesWritten;
+ },
+ className: 'padded',
+ width: 250,
+ show: stages.hasCounterForStage(stage, 'storage'),
+ Cell({ original }: { original: SimpleWideCounter }) {
+ const s = original.storage;
+ if (!s) return <i>none</i>;
+
+ const hasLocal = s.localBytesWritten > 0 || s.localBytesReserved
> 0;
+ const hasDurable = s.durableBytesWritten > 0;
+
+ if (!hasLocal && !hasDurable) return <i>none</i>;
+
+ const tooltipParts: string[] = [];
+ if (hasLocal) {
+ const usedPart =
+ s.localBytesMax != null
+ ? `(${formatBytesCompact(s.localBytesReserved)} /
${formatBytesCompact(
+ s.localBytesMax,
+ )} used)`
+ : `(${formatBytesCompact(s.localBytesReserved)} used)`;
+ tooltipParts.push(
+ `Local: ${formatBytesCompact(s.localBytesWritten)} written
in ${pluralIfNeeded(
+ s.localFilesWritten,
+ 'file',
+ )} ${usedPart}`,
);
- },
- };
- }),
- Stages.stageType(stage) === 'segmentGenerator'
- ? [
- {
- Header: twoLines('Merged', <i>rows</i>),
- id: 'segmentGeneration_rowsMerged',
- accessor: d => d.segmentGenerationProgress?.rowsMerged || 0,
- className: 'padded',
- width: 180,
- Cell({ value }) {
- return <BracedText text={formatRows(value)}
braces={bracesSegmentRowsMerged} />;
- },
- },
- {
- Header: twoLines('Pushed', <i>rows</i>),
- id: 'segmentGeneration_rowsPushed',
- accessor: d => d.segmentGenerationProgress?.rowsPushed || 0,
- className: 'padded',
- width: 180,
- Cell({ value }) {
- return <BracedText text={formatRows(value)}
braces={bracesSegmentRowsPushed} />;
- },
- },
- ]
- : [],
- )}
+ }
+ if (hasDurable) {
+ tooltipParts.push(
+ `Durable: ${formatBytesCompact(
+ s.durableBytesWritten,
+ )} written in ${pluralIfNeeded(s.durableFileCount, 'file')}`,
+ );
+ }
+
+ return (
+ <div data-tooltip={tooltipParts.join('\n')}>
+ {hasLocal && (
+ <div>
+ <span className="storage-label">Local</span>
+ {formatBytesCompact(s.localBytesWritten)}
+ {s.localBytesReserved > 0 && (
+ <span className="storage-used">
+ {` (${formatBytesCompact(s.localBytesReserved)}
used)`}
+ </span>
+ )}
+ </div>
+ )}
+ {hasDurable && (
+ <div>
+ <span className="storage-label">Durable</span>
+ {formatBytesCompact(s.durableBytesWritten)}
+ </div>
+ )}
+ </div>
+ );
+ },
+ } as Column<SimpleWideCounter>,
+ ])}
/>
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]