This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a4d49211573 feat: MSQ storage counters. (#19316)
a4d49211573 is described below

commit a4d4921157348d2c344ab723a072a52d42260fec
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Apr 17 15:14:59 2026 -0700

    feat: MSQ storage counters. (#19316)
    
    This patch adds storage counters, providing information on the amount of
    bytes and files written to local and durable storage, as well as the
    current state of the local ByteTracker.
    
    This patch also adds per-worker storage counters to the web console.
    To make room for them, the per-channel counters are consolidated into a
    single column, as they are in the stage summary row.
---
 .../apache/druid/msq/counters/CounterNames.java    |  10 +
 .../apache/druid/msq/counters/CounterTracker.java  |  12 +
 .../apache/druid/msq/counters/StorageCounters.java | 232 +++++++++++++++++
 .../org/apache/druid/msq/exec/RunWorkOrder.java    |  47 ++--
 .../druid/msq/exec/std/StandardStageRunner.java    |   6 +-
 .../apache/druid/msq/guice/MSQIndexingModule.java  |   2 +
 ...va => ChannelCountingOutputChannelFactory.java} |   4 +-
 ...va => StorageCountingOutputChannelFactory.java} |  45 ++--
 .../StorageCountingWritableFrameChannel.java       | 102 ++++++++
 .../druid/msq/counters/StorageCountersTest.java    | 171 +++++++++++++
 .../apache/druid/frame/channel/ByteTracker.java    |  10 +
 web-console/src/druid-models/stages/stages.mock.ts |  27 ++
 web-console/src/druid-models/stages/stages.ts      |  26 ++
 .../execution-stages-pane.scss                     |  18 ++
 .../execution-stages-pane.tsx                      | 275 ++++++++++++---------
 15 files changed, 837 insertions(+), 150 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
index 19d5da36efb..2b7579deb72 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
@@ -20,6 +20,7 @@
 package org.apache.druid.msq.counters;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.frame.channel.ByteTracker;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.ordering.StringComparators;
 
@@ -37,6 +38,7 @@ public class CounterNames
   private static final String CPU = "cpu";
   private static final String SORT_PROGRESS = "sortProgress";
   private static final String SEGMENT_GENERATION_PROGRESS = 
"segmentGenerationProgress";
+  private static final String STORAGE = "storage";
   private static final String WARNINGS = "warnings";
   private static final Comparator<String> COMPARATOR = new NameComparator();
 
@@ -85,6 +87,14 @@ public class CounterNames
     return SORT_PROGRESS;
   }
 
+  /**
+   * Standard name for a storage counter created by {@link 
CounterTracker#storage(ByteTracker)}.
+   */
+  public static String storage()
+  {
+    return STORAGE;
+  }
+
   /**
    * Standard name for a warnings counter created by {@link 
CounterTracker#warnings()}.
    */
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
index dedc649215f..b626d024368 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.msq.counters;
 
+import com.google.common.base.Preconditions;
+import org.apache.druid.frame.channel.ByteTracker;
 import org.apache.druid.frame.processor.FrameProcessor;
 import org.apache.druid.frame.processor.SuperSorterProgressTracker;
 import org.apache.druid.frame.processor.manager.ProcessorManager;
@@ -101,6 +103,16 @@ public class CounterTracker
     return counter(CounterNames.getSegmentGenerationProgress(), 
SegmentGenerationProgressCounter::new);
   }
 
+  public StorageCounters storage(final ByteTracker localByteTracker)
+  {
+    final StorageCounters storageCounters = counter(CounterNames.storage(), () 
-> new StorageCounters(localByteTracker));
+    Preconditions.checkState(
+        storageCounters.getLocalByteTracker() == localByteTracker,
+        "StorageCounters already exists with a different ByteTracker"
+    );
+    return storageCounters;
+  }
+
   public WarningCounters warnings()
   {
     return counter(CounterNames.warnings(), WarningCounters::new);
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/StorageCounters.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/StorageCounters.java
new file mode 100644
index 00000000000..f1acd142778
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/StorageCounters.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.frame.channel.ByteTracker;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Counters for storage usage during MSQ query execution. Created by {@link 
CounterTracker#storage(ByteTracker)}.
+ *
+ * Tracks:
+ * <ul>
+ *   <li>Local byte tracker max and reserved size</li>
+ *   <li>Total files and bytes written to local storage</li>
+ *   <li>Total files and bytes written to durable storage</li>
+ * </ul>
+ */
+public class StorageCounters implements QueryCounter
+{
+  @Nullable
+  private final ByteTracker localByteTracker;
+
+  private final AtomicLong localFilesWritten = new AtomicLong();
+  private final AtomicLong localBytesWritten = new AtomicLong();
+  private final AtomicLong durableFileCount = new AtomicLong();
+  private final AtomicLong durableBytesWritten = new AtomicLong();
+
+  public StorageCounters(@Nullable final ByteTracker localByteTracker)
+  {
+    this.localByteTracker = localByteTracker;
+  }
+
+  @Nullable
+  public ByteTracker getLocalByteTracker()
+  {
+    return localByteTracker;
+  }
+
+  /**
+   * Increments the local storage file counter by one.
+   */
+  public void incrementLocalFiles()
+  {
+    localFilesWritten.incrementAndGet();
+  }
+
+  /**
+   * Adds to the local storage bytes-written counter.
+   */
+  public void incrementLocalBytes(final long bytes)
+  {
+    localBytesWritten.addAndGet(bytes);
+  }
+
+  /**
+   * Increments the durable storage file counter by one.
+   */
+  public void incrementDurableFiles()
+  {
+    durableFileCount.incrementAndGet();
+  }
+
+  /**
+   * Adds to the durable storage bytes-written counter.
+   */
+  public void incrementDurableBytes(final long bytes)
+  {
+    durableBytesWritten.addAndGet(bytes);
+  }
+
+  @Override
+  @Nullable
+  public QueryCounterSnapshot snapshot()
+  {
+    final Long localBytesMax;
+    final long localBytesReserved;
+
+    if (localByteTracker != null) {
+      final long maxBytes = localByteTracker.getMaxBytes();
+      localBytesMax = maxBytes == Long.MAX_VALUE ? null : maxBytes;
+      localBytesReserved = localByteTracker.getCurrentBytes();
+    } else {
+      localBytesMax = null;
+      localBytesReserved = 0;
+    }
+
+    return new Snapshot(
+        localBytesMax,
+        localBytesReserved,
+        localFilesWritten.get(),
+        localBytesWritten.get(),
+        durableFileCount.get(),
+        durableBytesWritten.get()
+    );
+  }
+
+  @JsonTypeName("storage")
+  public static class Snapshot implements QueryCounterSnapshot
+  {
+    @Nullable
+    private final Long localBytesMax;
+    private final long localBytesReserved;
+    private final long localFilesWritten;
+    private final long localBytesWritten;
+    private final long durableFileCount;
+    private final long durableBytesWritten;
+
+    @JsonCreator
+    public Snapshot(
+        @JsonProperty("localBytesMax") @Nullable final Long localBytesMax,
+        @JsonProperty("localBytesReserved") final long localBytesReserved,
+        @JsonProperty("localFilesWritten") final long localFilesWritten,
+        @JsonProperty("localBytesWritten") final long localBytesWritten,
+        @JsonProperty("durableFileCount") final long durableFileCount,
+        @JsonProperty("durableBytesWritten") final long durableBytesWritten
+    )
+    {
+      this.localBytesMax = localBytesMax;
+      this.localBytesReserved = localBytesReserved;
+      this.localFilesWritten = localFilesWritten;
+      this.localBytesWritten = localBytesWritten;
+      this.durableFileCount = durableFileCount;
+      this.durableBytesWritten = durableBytesWritten;
+    }
+
+    @Nullable
+    @JsonProperty("localBytesMax")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    public Long getLocalBytesMax()
+    {
+      return localBytesMax;
+    }
+
+    @JsonProperty("localBytesReserved")
+    public long getLocalBytesReserved()
+    {
+      return localBytesReserved;
+    }
+
+    @JsonProperty("localFilesWritten")
+    public long getLocalFilesWritten()
+    {
+      return localFilesWritten;
+    }
+
+    @JsonProperty("localBytesWritten")
+    public long getLocalBytesWritten()
+    {
+      return localBytesWritten;
+    }
+
+    @JsonProperty("durableFileCount")
+    public long getDurableFileCount()
+    {
+      return durableFileCount;
+    }
+
+    @JsonProperty("durableBytesWritten")
+    public long getDurableBytesWritten()
+    {
+      return durableBytesWritten;
+    }
+
+    @Override
+    public boolean equals(final Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      final Snapshot snapshot = (Snapshot) o;
+      return Objects.equals(localBytesMax, snapshot.localBytesMax)
+             && localBytesReserved == snapshot.localBytesReserved
+             && localFilesWritten == snapshot.localFilesWritten
+             && localBytesWritten == snapshot.localBytesWritten
+             && durableFileCount == snapshot.durableFileCount
+             && durableBytesWritten == snapshot.durableBytesWritten;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(
+          localBytesMax,
+          localBytesReserved,
+          localFilesWritten,
+          localBytesWritten,
+          durableFileCount,
+          durableBytesWritten
+      );
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Snapshot{" +
+             "localBytesMax=" + localBytesMax +
+             ", localBytesReserved=" + localBytesReserved +
+             ", localFilesWritten=" + localFilesWritten +
+             ", localBytesWritten=" + localBytesWritten +
+             ", durableFileCount=" + durableFileCount +
+             ", durableBytesWritten=" + durableBytesWritten +
+             '}';
+    }
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index f73ffa010fc..ba613768781 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.msq.counters.CounterTracker;
+import org.apache.druid.msq.indexing.StorageCountingOutputChannelFactory;
 import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.input.InputSlice;
@@ -431,20 +432,28 @@ public class RunWorkOrder
       case LOCAL_STORAGE:
         final File fileChannelDirectory =
             frameContext.tempDir(StringUtils.format("output_stage_%06d", 
workOrder.getStageNumber()));
-        outputChannelFactory = new FileOutputChannelFactory(
-            fileChannelDirectory,
-            frameSize,
-            null,
-            frameContext.wireTransferableContext()
+        outputChannelFactory = new StorageCountingOutputChannelFactory(
+            new FileOutputChannelFactory(
+                fileChannelDirectory,
+                frameSize,
+                null,
+                frameContext.wireTransferableContext()
+            ),
+            counterTracker.storage(intermediateByteTracker),
+            false
         );
         break;
 
       case DURABLE_STORAGE_INTERMEDIATE:
       case DURABLE_STORAGE_QUERY_RESULTS:
-        outputChannelFactory = makeDurableStorageOutputChannelFactory(
-            frameContext.tempDir("durable"),
-            frameSize,
-            outputChannelMode == 
OutputChannelMode.DURABLE_STORAGE_QUERY_RESULTS
+        outputChannelFactory = new StorageCountingOutputChannelFactory(
+            makeDurableStorageOutputChannelFactory(
+                frameContext.tempDir("durable"),
+                frameSize,
+                outputChannelMode == 
OutputChannelMode.DURABLE_STORAGE_QUERY_RESULTS
+            ),
+            counterTracker.storage(intermediateByteTracker),
+            true
         );
         break;
 
@@ -469,11 +478,15 @@ public class RunWorkOrder
     final int frameSize = frameContext.memoryParameters().getFrameSize();
     final File fileChannelDirectory =
         new File(tempDir, StringUtils.format("intermediate-stage-%06d", 
workOrder.getStageNumber()));
-    final FileOutputChannelFactory fileOutputChannelFactory = new 
FileOutputChannelFactory(
-        fileChannelDirectory,
-        frameSize,
-        intermediateByteTracker,
-        frameContext.wireTransferableContext()
+    final OutputChannelFactory fileOutputChannelFactory = new 
StorageCountingOutputChannelFactory(
+        new FileOutputChannelFactory(
+            fileChannelDirectory,
+            frameSize,
+            intermediateByteTracker,
+            frameContext.wireTransferableContext()
+        ),
+        counterTracker.storage(intermediateByteTracker),
+        false
     );
 
     if (workOrder.getOutputChannelMode().isDurable()
@@ -483,7 +496,11 @@ public class RunWorkOrder
       return new ComposingOutputChannelFactory(
           ImmutableList.of(
               fileOutputChannelFactory,
-              makeDurableStorageOutputChannelFactory(tempDir, frameSize, 
isQueryResults)
+              new StorageCountingOutputChannelFactory(
+                  makeDurableStorageOutputChannelFactory(tempDir, frameSize, 
isQueryResults),
+                  counterTracker.storage(intermediateByteTracker),
+                  true
+              )
           ),
           frameSize
       );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
index 4f2024bd8df..fd7da3c88cd 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
@@ -33,7 +33,7 @@ import org.apache.druid.msq.counters.CpuCounters;
 import org.apache.druid.msq.exec.ExecutionContext;
 import org.apache.druid.msq.exec.FrameContext;
 import org.apache.druid.msq.exec.StageProcessor;
-import org.apache.druid.msq.indexing.CountingOutputChannelFactory;
+import org.apache.druid.msq.indexing.ChannelCountingOutputChannelFactory;
 import org.apache.druid.msq.kernel.ShuffleSpec;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -128,7 +128,7 @@ public class StandardStageRunner<T, R>
       baseOutputChannelFactory = executionContext.outputChannelFactory();
     }
 
-    workOutputChannelFactory = new CountingOutputChannelFactory(
+    workOutputChannelFactory = new ChannelCountingOutputChannelFactory(
         baseOutputChannelFactory,
         executionContext.counters().channel(CounterNames.outputChannel())
     );
@@ -181,7 +181,7 @@ public class StandardStageRunner<T, R>
 
     pipelineFuture = 
stageOperations.gatherResultKeyStatisticsIfNeeded(pipelineFuture);
 
-    final OutputChannelFactory stageOutputChannelFactory = new 
CountingOutputChannelFactory(
+    final OutputChannelFactory stageOutputChannelFactory = new 
ChannelCountingOutputChannelFactory(
         executionContext.outputChannelFactory(),
         executionContext.counters().channel(CounterNames.shuffleChannel())
     );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
index 6b100e078b4..629754f2a9a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
@@ -36,6 +36,7 @@ import org.apache.druid.msq.counters.CpuCounter;
 import org.apache.druid.msq.counters.CpuCounters;
 import org.apache.druid.msq.counters.NilQueryCounterSnapshot;
 import org.apache.druid.msq.counters.SegmentGenerationProgressCounter;
+import org.apache.druid.msq.counters.StorageCounters;
 import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter;
 import org.apache.druid.msq.counters.WarningCounters;
 import org.apache.druid.msq.indexing.IndexerControllerContextFactory;
@@ -198,6 +199,7 @@ public class MSQIndexingModule implements DruidModule
         SuperSorterProgressTrackerCounter.Snapshot.class,
         WarningCounters.Snapshot.class,
         SegmentGenerationProgressCounter.Snapshot.class,
+        StorageCounters.Snapshot.class,
         CpuCounters.Snapshot.class,
         CpuCounter.Snapshot.class,
         NilQueryCounterSnapshot.class,
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ChannelCountingOutputChannelFactory.java
similarity index 95%
copy from 
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
copy to 
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ChannelCountingOutputChannelFactory.java
index 667d99cc198..5dc0e6cf9ec 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ChannelCountingOutputChannelFactory.java
@@ -27,12 +27,12 @@ import org.apache.druid.msq.counters.ChannelCounters;
 
 import java.io.IOException;
 
-public class CountingOutputChannelFactory implements OutputChannelFactory
+public class ChannelCountingOutputChannelFactory implements 
OutputChannelFactory
 {
   private final OutputChannelFactory baseFactory;
   private final ChannelCounters channelCounters;
 
-  public CountingOutputChannelFactory(
+  public ChannelCountingOutputChannelFactory(
       final OutputChannelFactory baseFactory,
       final ChannelCounters channelCounters
   )
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingOutputChannelFactory.java
similarity index 59%
rename from 
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
rename to 
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingOutputChannelFactory.java
index 667d99cc198..6d0c770f2d9 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountingOutputChannelFactory.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingOutputChannelFactory.java
@@ -23,50 +23,61 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.frame.processor.OutputChannel;
 import org.apache.druid.frame.processor.OutputChannelFactory;
 import org.apache.druid.frame.processor.PartitionedOutputChannel;
-import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.counters.StorageCounters;
 
 import java.io.IOException;
 
-public class CountingOutputChannelFactory implements OutputChannelFactory
+/**
+ * Wraps an {@link OutputChannelFactory} to track storage file and byte counts 
in {@link StorageCounters}.
+ * Similar to {@link ChannelCountingOutputChannelFactory} but for 
storage-level tracking rather than
+ * per-partition channel tracking.
+ */
+public class StorageCountingOutputChannelFactory implements 
OutputChannelFactory
 {
   private final OutputChannelFactory baseFactory;
-  private final ChannelCounters channelCounters;
+  private final StorageCounters storageCounters;
+  private final boolean isDurable;
 
-  public CountingOutputChannelFactory(
+  public StorageCountingOutputChannelFactory(
       final OutputChannelFactory baseFactory,
-      final ChannelCounters channelCounters
+      final StorageCounters storageCounters,
+      final boolean isDurable
   )
   {
     this.baseFactory = Preconditions.checkNotNull(baseFactory, "baseFactory");
-    this.channelCounters = Preconditions.checkNotNull(channelCounters, 
"channelCounter");
+    this.storageCounters = Preconditions.checkNotNull(storageCounters, 
"storageCounters");
+    this.isDurable = isDurable;
   }
 
   @Override
-  public OutputChannel openChannel(int partitionNumber) throws IOException
+  public OutputChannel openChannel(final int partitionNumber) throws 
IOException
   {
     final OutputChannel baseChannel = baseFactory.openChannel(partitionNumber);
 
     return baseChannel.mapWritableChannel(
         baseWritableChannel ->
-            new CountingWritableFrameChannel(
-                baseChannel.getWritableChannel(),
-                channelCounters,
-                partitionNumber
+            new StorageCountingWritableFrameChannel(
+                baseWritableChannel,
+                storageCounters,
+                isDurable
             )
     );
   }
 
   @Override
-  public PartitionedOutputChannel openPartitionedChannel(String name, boolean 
deleteAfterRead) throws IOException
+  public PartitionedOutputChannel openPartitionedChannel(
+      final String name,
+      final boolean deleteAfterRead
+  ) throws IOException
   {
     final PartitionedOutputChannel baseChannel = 
baseFactory.openPartitionedChannel(name, deleteAfterRead);
 
     return baseChannel.mapWritableChannel(
         baseWritableChannel ->
-            new CountingWritableFrameChannel(
-                baseChannel.getWritableChannel(),
-                channelCounters,
-                null
+            new StorageCountingWritableFrameChannel(
+                baseWritableChannel,
+                storageCounters,
+                isDurable
             )
     );
   }
@@ -74,7 +85,7 @@ public class CountingOutputChannelFactory implements 
OutputChannelFactory
   @Override
   public OutputChannel openNilChannel(final int partitionNumber)
   {
-    // No need for counters on nil channels: they never receive input.
+    // No need for storage counters on nil channels: they never receive input.
     return baseFactory.openNilChannel(partitionNumber);
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingWritableFrameChannel.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingWritableFrameChannel.java
new file mode 100644
index 00000000000..15c5ef17fde
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/StorageCountingWritableFrameChannel.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.msq.counters.StorageCounters;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * Wraps a {@link WritableFrameChannel} to track storage usage in {@link 
StorageCounters}.
+ * Updates counters on each {@link #write} call: the first write increments 
the file count by one,
+ * and every write increments bytes written.
+ */
+public class StorageCountingWritableFrameChannel implements 
WritableFrameChannel
+{
+  private final WritableFrameChannel baseChannel;
+  private final StorageCounters storageCounters;
+  private final boolean isDurable;
+  private boolean firstWrite = true;
+
+  public StorageCountingWritableFrameChannel(
+      final WritableFrameChannel baseChannel,
+      final StorageCounters storageCounters,
+      final boolean isDurable
+  )
+  {
+    this.baseChannel = baseChannel;
+    this.storageCounters = storageCounters;
+    this.isDurable = isDurable;
+  }
+
+  @Override
+  public void write(final RowsAndColumns rac, final int partitionNum) throws 
IOException
+  {
+    baseChannel.write(rac, partitionNum);
+
+    if (firstWrite) {
+      firstWrite = false;
+      if (isDurable) {
+        storageCounters.incrementDurableFiles();
+      } else {
+        storageCounters.incrementLocalFiles();
+      }
+    }
+
+    if (rac instanceof FrameRowsAndColumns) {
+      final Frame frame = ((FrameRowsAndColumns) rac).getFrame();
+      if (isDurable) {
+        storageCounters.incrementDurableBytes(frame.numBytes());
+      } else {
+        storageCounters.incrementLocalBytes(frame.numBytes());
+      }
+    }
+  }
+
+  @Override
+  public void fail(@Nullable final Throwable cause) throws IOException
+  {
+    baseChannel.fail(cause);
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    baseChannel.close();
+  }
+
+  @Override
+  public boolean isClosed()
+  {
+    return baseChannel.isClosed();
+  }
+
+  @Override
+  public ListenableFuture<?> writabilityFuture()
+  {
+    return baseChannel.writabilityFuture();
+  }
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/counters/StorageCountersTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/StorageCountersTest.java
new file mode 100644
index 00000000000..f0e449f35e7
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/StorageCountersTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.frame.channel.ByteTracker;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StorageCountersTest
+{
+  @Test
+  public void testSnapshotWithNoByteTracker()
+  {
+    final StorageCounters counters = new StorageCounters(null);
+    final StorageCounters.Snapshot snapshot = (StorageCounters.Snapshot) 
counters.snapshot();
+
+    Assert.assertNull(snapshot.getLocalBytesMax());
+    Assert.assertEquals(0, snapshot.getLocalBytesReserved());
+    Assert.assertEquals(0, snapshot.getLocalFilesWritten());
+    Assert.assertEquals(0, snapshot.getLocalBytesWritten());
+    Assert.assertEquals(0, snapshot.getDurableFileCount());
+    Assert.assertEquals(0, snapshot.getDurableBytesWritten());
+  }
+
+  @Test
+  public void testSnapshotWithByteTracker()
+  {
+    final ByteTracker tracker = new ByteTracker(1000);
+    tracker.reserve(300);
+
+    final StorageCounters counters = new StorageCounters(tracker);
+
+    final StorageCounters.Snapshot snapshot = (StorageCounters.Snapshot) 
counters.snapshot();
+    Assert.assertEquals(1000L, (long) snapshot.getLocalBytesMax());
+    Assert.assertEquals(300, snapshot.getLocalBytesReserved());
+  }
+
+  @Test
+  public void testSnapshotWithByteTrackerReflectsCurrentState()
+  {
+    final ByteTracker tracker = new ByteTracker(1000);
+    tracker.reserve(300);
+
+    final StorageCounters counters = new StorageCounters(tracker);
+
+    // Take first snapshot
+    final StorageCounters.Snapshot snapshot1 = (StorageCounters.Snapshot) 
counters.snapshot();
+    Assert.assertEquals(300, snapshot1.getLocalBytesReserved());
+
+    // Change tracker state
+    tracker.reserve(200);
+
+    // Second snapshot reflects new state
+    final StorageCounters.Snapshot snapshot2 = (StorageCounters.Snapshot) 
counters.snapshot();
+    Assert.assertEquals(500, snapshot2.getLocalBytesReserved());
+  }
+
+  @Test
+  public void testIncrementLocalFiles()
+  {
+    final StorageCounters counters = new StorageCounters(null);
+    counters.incrementLocalFiles();
+    counters.incrementLocalBytes(500);
+    counters.incrementLocalFiles();
+    counters.incrementLocalBytes(300);
+
+    final StorageCounters.Snapshot snapshot = (StorageCounters.Snapshot) 
counters.snapshot();
+    Assert.assertEquals(2, snapshot.getLocalFilesWritten());
+    Assert.assertEquals(800, snapshot.getLocalBytesWritten());
+    Assert.assertEquals(0, snapshot.getDurableFileCount());
+    Assert.assertEquals(0, snapshot.getDurableBytesWritten());
+  }
+
+  @Test
+  public void testIncrementDurableFiles()
+  {
+    final StorageCounters counters = new StorageCounters(null);
+    counters.incrementDurableFiles();
+    counters.incrementDurableBytes(1000);
+    counters.incrementDurableFiles();
+    counters.incrementDurableBytes(2000);
+
+    final StorageCounters.Snapshot snapshot = (StorageCounters.Snapshot) 
counters.snapshot();
+    Assert.assertEquals(0, snapshot.getLocalFilesWritten());
+    Assert.assertEquals(0, snapshot.getLocalBytesWritten());
+    Assert.assertEquals(2, snapshot.getDurableFileCount());
+    Assert.assertEquals(3000, snapshot.getDurableBytesWritten());
+  }
+
+  @Test
+  public void testSnapshotSerde() throws Exception
+  {
+    final ObjectMapper mapper =
+        TestHelper.makeJsonMapper().registerModules(new 
MSQIndexingModule().getJacksonModules());
+
+    final StorageCounters.Snapshot snapshot = new 
StorageCounters.Snapshot(1000L, 300, 5, 2500, 3, 1500);
+
+    final String json = mapper.writeValueAsString(snapshot);
+    final StorageCounters.Snapshot deserialized = mapper.readValue(json, 
StorageCounters.Snapshot.class);
+
+    Assert.assertEquals(snapshot, deserialized);
+  }
+
+  @Test
+  public void testSnapshotSerdeViaCounterSnapshots() throws Exception
+  {
+    final ObjectMapper mapper =
+        TestHelper.makeJsonMapper().registerModules(new 
MSQIndexingModule().getJacksonModules());
+
+    final StorageCounters.Snapshot snapshot = new 
StorageCounters.Snapshot(1000L, 300, 5, 2500, 3, 1500);
+    final CounterSnapshotsTree tree = new CounterSnapshotsTree();
+    tree.put(0, 0, new 
CounterSnapshots(ImmutableMap.of(CounterNames.storage(), snapshot)));
+
+    final String json = mapper.writeValueAsString(tree);
+    final CounterSnapshotsTree deserialized = mapper.readValue(json, 
CounterSnapshotsTree.class);
+
+    Assert.assertEquals(
+        snapshot,
+        
deserialized.copyMap().get(0).get(0).getMap().get(CounterNames.storage())
+    );
+  }
+
+  @Test
+  public void testCounterTrackerDefensiveCheck()
+  {
+    final ByteTracker tracker1 = new ByteTracker(1000);
+    final ByteTracker tracker2 = new ByteTracker(2000);
+
+    final CounterTracker counterTracker = new CounterTracker(true);
+
+    // First call creates the counter
+    final StorageCounters counters = counterTracker.storage(tracker1);
+    Assert.assertSame(tracker1, counters.getLocalByteTracker());
+
+    // Second call with same tracker succeeds
+    Assert.assertSame(counters, counterTracker.storage(tracker1));
+
+    // Call with different tracker fails
+    Assert.assertThrows(IllegalStateException.class, () -> 
counterTracker.storage(tracker2));
+  }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(StorageCounters.Snapshot.class)
+                  .usingGetClass()
+                  .verify();
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java 
b/processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java
index 9dbe2e024bb..092da7cc07b 100644
--- a/processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java
+++ b/processing/src/main/java/org/apache/druid/frame/channel/ByteTracker.java
@@ -71,6 +71,16 @@ public class ByteTracker
     currentBytes = Math.subtractExact(currentBytes, byteCount);
   }
 
+  public synchronized long getMaxBytes()
+  {
+    return maxBytes;
+  }
+
+  public synchronized long getCurrentBytes()
+  {
+    return currentBytes;
+  }
+
   public static ByteTracker unboundedTracker()
   {
     return new ByteTracker(Long.MAX_VALUE);
diff --git a/web-console/src/druid-models/stages/stages.mock.ts 
b/web-console/src/druid-models/stages/stages.mock.ts
index c10c5794571..9820ac315f5 100644
--- a/web-console/src/druid-models/stages/stages.mock.ts
+++ b/web-console/src/druid-models/stages/stages.mock.ts
@@ -1144,6 +1144,15 @@ export const STAGES = new Stages(
           totalMergersForUltimateLevel: 1,
           progressDigest: 1,
         },
+        storage: {
+          type: 'storage',
+          localBytesMax: 1073741824,
+          localBytesReserved: 5242880,
+          localFilesWritten: 2,
+          localBytesWritten: 4120000,
+          durableFileCount: 1,
+          durableBytesWritten: 3900000,
+        },
       },
     },
     '1': {
@@ -1189,6 +1198,15 @@ export const STAGES = new Stages(
           totalMergersForUltimateLevel: 1,
           progressDigest: 1,
         },
+        storage: {
+          type: 'storage',
+          localBytesMax: 1073741824,
+          localBytesReserved: 52428800,
+          localFilesWritten: 24,
+          localBytesWritten: 48234567,
+          durableFileCount: 12,
+          durableBytesWritten: 35123456,
+        },
       },
     },
     '2': {
@@ -1234,6 +1252,15 @@ export const STAGES = new Stages(
           totalMergersForUltimateLevel: 24,
           progressDigest: 1,
         },
+        storage: {
+          type: 'storage',
+          localBytesMax: 1073741824,
+          localBytesReserved: 104857600,
+          localFilesWritten: 48,
+          localBytesWritten: 95432100,
+          durableFileCount: 24,
+          durableBytesWritten: 72345678,
+        },
       },
     },
     '3': {
diff --git a/web-console/src/druid-models/stages/stages.ts 
b/web-console/src/druid-models/stages/stages.ts
index 8ad539a109f..07014cfd793 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -147,6 +147,7 @@ export interface StageWorkerCounter {
   segmentGenerationProgress?: SegmentGenerationProgressCounter;
   warnings?: WarningCounter;
   cpu?: CpusCounter;
+  storage?: StorageCounter;
 }
 
 export type ChannelCounterName = `input${number}` | 'output' | 'shuffle';
@@ -301,6 +302,29 @@ export interface CpuCounter {
   wall: number;
 }
 
+export interface StorageCounter {
+  type: 'storage';
+  localBytesMax?: number;
+  localBytesReserved: number;
+  localFilesWritten: number;
+  localBytesWritten: number;
+  durableFileCount: number;
+  durableBytesWritten: number;
+}
+
+function normalizeStorageCounter(s: StorageCounter | undefined): 
StorageCounter | undefined {
+  if (!s) return;
+  return {
+    type: 'storage',
+    localBytesMax: s.localBytesMax != null ? Number(s.localBytesMax) : 
undefined,
+    localBytesReserved: Number(s.localBytesReserved),
+    localFilesWritten: Number(s.localFilesWritten),
+    localBytesWritten: Number(s.localBytesWritten),
+    durableFileCount: Number(s.durableFileCount),
+    durableBytesWritten: Number(s.durableBytesWritten),
+  };
+}
+
 function sumCpuCounters(cs: CpuCounter[]): CpuCounter {
   return aggregateThings(cs, {
     type: 'cpu',
@@ -316,6 +340,7 @@ export interface SimpleWideCounter {
   shuffle?: Record<ChannelFields, number>;
   segmentGenerationProgress?: SegmentGenerationProgressCounter;
   cpu?: CpusCounter;
+  storage?: StorageCounter;
 }
 
 function zeroChannelFields(): Record<ChannelFields, number> {
@@ -640,6 +665,7 @@ export class Stages {
       }
       newWideCounter.segmentGenerationProgress = 
stageCounters.segmentGenerationProgress;
       newWideCounter.cpu = stageCounters.cpu;
+      newWideCounter.storage = normalizeStorageCounter(stageCounters.storage);
       return newWideCounter;
     });
   }
diff --git 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.scss
 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.scss
index c0a0553fdad..a64e9062d8d 100644
--- 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.scss
+++ 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.scss
@@ -144,6 +144,24 @@
     width: 80px;
   }
 
+  .rows-label {
+    display: inline-block;
+    width: 140px;
+    overflow: hidden;
+    text-overflow: ellipsis;
+    white-space: nowrap;
+    vertical-align: bottom;
+  }
+
+  .storage-label {
+    display: inline-block;
+    width: 55px;
+  }
+
+  .storage-used {
+    opacity: 0.7;
+  }
+
   .timing-value {
     position: relative;
     height: 100%;
diff --git 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
index c6fe9a73574..4b74879f625 100644
--- 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
+++ 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
@@ -222,30 +222,25 @@ export const ExecutionStagesPane = React.memo(function 
ExecutionStagesPane(
 
     const counterNames: ChannelCounterName[] = 
stages.getChannelCounterNamesForStage(stage);
 
-    const bracesRows: Record<ChannelCounterName, string[]> = {} as any;
-    const bracesExtra: Record<ChannelCounterName, string[]> = {} as any;
-    for (const counterName of counterNames) {
-      bracesRows[counterName] = wideCounters.map(wideCounter =>
-        formatRows(wideCounter[counterName]!.rows),
-      );
-      bracesExtra[counterName] = filterMap(wideCounters, wideCounter => {
-        const totalFiles = wideCounter[counterName]!.totalFiles;
-        if (!totalFiles) return;
-        return formatFileOfTotalForBrace(totalFiles, totalFiles);
-      });
-    }
-
     const isSegmentGenerator = Stages.stageType(stage) === 'segmentGenerator';
-    let bracesSegmentRowsMerged: string[] = [];
-    let bracesSegmentRowsPushed: string[] = [];
+
+    // Unified braces for the combined rows column
+    const allBracesRows: string[] = counterNames.flatMap(counterName =>
+      wideCounters.map(wideCounter => 
formatRows(wideCounter[counterName]!.rows)),
+    );
     if (isSegmentGenerator) {
-      bracesSegmentRowsMerged = wideCounters.map(wideCounter =>
-        formatRows(wideCounter.segmentGenerationProgress?.rowsMerged || 0),
-      );
-      bracesSegmentRowsPushed = wideCounters.map(wideCounter =>
-        formatRows(wideCounter.segmentGenerationProgress?.rowsPushed || 0),
+      allBracesRows.push(
+        ...wideCounters.map(wc => 
formatRows(wc.segmentGenerationProgress?.rowsMerged || 0)),
+        ...wideCounters.map(wc => 
formatRows(wc.segmentGenerationProgress?.rowsPushed || 0)),
       );
     }
+    const allBracesFiles: string[] = filterMap(
+      counterNames.flatMap(counterName =>
+        wideCounters.map(wideCounter => wideCounter[counterName]!),
+      ),
+      c => (c.totalFiles ? formatFileOfTotalForBrace(c.totalFiles, 
c.totalFiles) : undefined),
+    );
+    const firstNonInputIndex = counterNames.findIndex(cn => 
!cn.startsWith('input'));
 
     return (
       <ReactTable
@@ -327,103 +322,157 @@ export const ExecutionStagesPane = React.memo(function 
ExecutionStagesPane(
               );
             },
           } as Column<SimpleWideCounter>,
-        ].concat(
-          counterNames.map((counterName, i) => {
-            const isInput = counterName.startsWith('input');
-            return {
-              Header: twoLines(
-                isInput ? (
-                  <span>{inputLabelContent(stage, i)}</span>
-                ) : (
-                  stages.getStageCounterTitle(stage, counterName)
-                ),
-                isInput ? <i>rows &nbsp; (input files)</i> : <i>rows</i>,
-              ),
-              id: counterName,
-              accessor: d => d[counterName]!.rows,
-              className: 'padded',
-              width: 200,
-              Cell({ value, original }) {
-                const c = (original as SimpleWideCounter)[counterName]!;
-                return (
-                  <>
-                    <BracedText
-                      text={formatRows(value)}
-                      braces={bracesRows[counterName]}
-                      data-tooltip={
-                        c.bytes
-                          ? `Uncompressed size: ${formatBytesCompact(c.bytes)} 
${NOT_SIZE_ON_DISK}`
-                          : NO_SIZE_INFO
-                      }
-                    />
-                    {Boolean(c.totalFiles) && (
-                      <>
-                        {' '}
-                        &nbsp;{' '}
-                        <BracedText
-                          text={formatFileOfTotal(c.files, c.totalFiles)}
-                          braces={bracesExtra[counterName]}
-                        />
-                      </>
-                    )}
-                    {Boolean(c.loadFiles) && (
-                      <>
-                        {' '}
-                        &nbsp;{' '}
-                        <Icon
-                          className="load-indicator"
-                          icon={IconNames.IMPORT}
-                          data-tooltip={formatLoadTooltip(
-                            c.loadFiles,
-                            c.loadBytes,
-                            c.loadTime,
-                            c.loadWait,
+        ].concat([
+          {
+            Header: twoLines('Rows processed', <i>rows &nbsp; (input 
files)</i>),
+            id: 'rows_processed',
+            accessor: (d: SimpleWideCounter) =>
+              counterNames.reduce((acc, cn) => acc + (d[cn]?.rows || 0), 0),
+            className: 'padded',
+            width: 300,
+            Cell({ original }: { original: SimpleWideCounter }) {
+              return (
+                <>
+                  {counterNames.map((counterName, idx) => {
+                    const c = original[counterName]!;
+                    const isInput = counterName.startsWith('input');
+                    const inputIndex = isInput ? 
Number(counterName.replace('input', '')) : -1;
+                    const showSpacer = idx === firstNonInputIndex && 
firstNonInputIndex > 0;
+                    const label = isInput
+                      ? formatInputLabel(stage, inputIndex)
+                      : stages.getStageCounterTitle(stage, counterName);
+                    const tooltipParts: string[] = [];
+                    if (c.bytes) {
+                      tooltipParts.push(
+                        `Uncompressed size: ${formatBytesCompact(c.bytes)} 
${NOT_SIZE_ON_DISK}`,
+                      );
+                    }
+                    if (c.loadFiles) {
+                      tooltipParts.push(
+                        formatLoadTooltip(c.loadFiles, c.loadBytes, 
c.loadTime, c.loadWait),
+                      );
+                    }
+                    if (c.queries || c.totalQueries) {
+                      tooltipParts.push(
+                        `Realtime queries: ${formatInteger(c.queries || 0)} / 
${formatInteger(
+                          c.totalQueries || 0,
+                        )}`,
+                      );
+                    }
+                    return (
+                      <React.Fragment key={counterName}>
+                        {showSpacer && <div className="counter-spacer 
extend-right" />}
+                        <div
+                          data-tooltip={
+                            tooltipParts.length ? tooltipParts.join('\n') : 
NO_SIZE_INFO
+                          }
+                        >
+                          <span className="rows-label">{label}</span>
+                          <BracedText text={formatRows(c.rows)} 
braces={allBracesRows} />
+                          {Boolean(c.totalFiles) && (
+                            <>
+                              {' '}
+                              &nbsp;{' '}
+                              <BracedText
+                                text={formatFileOfTotal(c.files, c.totalFiles)}
+                                braces={allBracesFiles}
+                              />
+                            </>
                           )}
+                        </div>
+                      </React.Fragment>
+                    );
+                  })}
+                  {isSegmentGenerator && (
+                    <>
+                      <div className="counter-spacer extend-right" />
+                      <div>
+                        <span className="rows-label">Merged</span>
+                        <BracedText
+                          
text={formatRows(original.segmentGenerationProgress?.rowsMerged || 0)}
+                          braces={allBracesRows}
                         />
-                      </>
-                    )}
-                    {Boolean(c.queries || c.totalQueries) && (
-                      <>
-                        {' '}
-                        &nbsp;{' '}
-                        <Icon
-                          icon={IconNames.ARROW_BOTTOM_LEFT}
-                          data-tooltip={`Realtime queries (${formatInteger(
-                            c.queries || 0,
-                          )} / ${formatInteger(c.totalQueries || 0)})`}
+                      </div>
+                      <div>
+                        <span className="rows-label">Pushed</span>
+                        <BracedText
+                          
text={formatRows(original.segmentGenerationProgress?.rowsPushed || 0)}
+                          braces={allBracesRows}
                         />
-                      </>
-                    )}
-                  </>
+                      </div>
+                    </>
+                  )}
+                </>
+              );
+            },
+          } as Column<SimpleWideCounter>,
+          {
+            Header: 'Storage utilization',
+            id: 'storage',
+            accessor: (d: SimpleWideCounter) => {
+              const s = d.storage;
+              if (!s) return 0;
+              return s.localBytesWritten + s.durableBytesWritten;
+            },
+            className: 'padded',
+            width: 250,
+            show: stages.hasCounterForStage(stage, 'storage'),
+            Cell({ original }: { original: SimpleWideCounter }) {
+              const s = original.storage;
+              if (!s) return <i>none</i>;
+
+              const hasLocal = s.localBytesWritten > 0 || s.localBytesReserved 
> 0;
+              const hasDurable = s.durableBytesWritten > 0;
+
+              if (!hasLocal && !hasDurable) return <i>none</i>;
+
+              const tooltipParts: string[] = [];
+              if (hasLocal) {
+                const usedPart =
+                  s.localBytesMax != null
+                    ? `(${formatBytesCompact(s.localBytesReserved)} / 
${formatBytesCompact(
+                        s.localBytesMax,
+                      )} used)`
+                    : `(${formatBytesCompact(s.localBytesReserved)} used)`;
+                tooltipParts.push(
+                  `Local: ${formatBytesCompact(s.localBytesWritten)} written 
in ${pluralIfNeeded(
+                    s.localFilesWritten,
+                    'file',
+                  )} ${usedPart}`,
                 );
-              },
-            };
-          }),
-          Stages.stageType(stage) === 'segmentGenerator'
-            ? [
-                {
-                  Header: twoLines('Merged', <i>rows</i>),
-                  id: 'segmentGeneration_rowsMerged',
-                  accessor: d => d.segmentGenerationProgress?.rowsMerged || 0,
-                  className: 'padded',
-                  width: 180,
-                  Cell({ value }) {
-                    return <BracedText text={formatRows(value)} 
braces={bracesSegmentRowsMerged} />;
-                  },
-                },
-                {
-                  Header: twoLines('Pushed', <i>rows</i>),
-                  id: 'segmentGeneration_rowsPushed',
-                  accessor: d => d.segmentGenerationProgress?.rowsPushed || 0,
-                  className: 'padded',
-                  width: 180,
-                  Cell({ value }) {
-                    return <BracedText text={formatRows(value)} 
braces={bracesSegmentRowsPushed} />;
-                  },
-                },
-              ]
-            : [],
-        )}
+              }
+              if (hasDurable) {
+                tooltipParts.push(
+                  `Durable: ${formatBytesCompact(
+                    s.durableBytesWritten,
+                  )} written in ${pluralIfNeeded(s.durableFileCount, 'file')}`,
+                );
+              }
+
+              return (
+                <div data-tooltip={tooltipParts.join('\n')}>
+                  {hasLocal && (
+                    <div>
+                      <span className="storage-label">Local</span>
+                      {formatBytesCompact(s.localBytesWritten)}
+                      {s.localBytesReserved > 0 && (
+                        <span className="storage-used">
+                          {` (${formatBytesCompact(s.localBytesReserved)} 
used)`}
+                        </span>
+                      )}
+                    </div>
+                  )}
+                  {hasDurable && (
+                    <div>
+                      <span className="storage-label">Durable</span>
+                      {formatBytesCompact(s.durableBytesWritten)}
+                    </div>
+                  )}
+                </div>
+              );
+            },
+          } as Column<SimpleWideCounter>,
+        ])}
       />
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to