https://github.com/python/cpython/commit/76f22853410d3ded872cbfe1430852cf8c048962
commit: 76f22853410d3ded872cbfe1430852cf8c048962
branch: main
author: Maurycy Pawłowski-Wieroński <[email protected]>
committer: pablogsal <[email protected]>
date: 2026-05-12T23:46:21Z
summary:
gh-149718: Aggregate same stack frames in Tachyon in some collectors (#149719)
files:
A Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst
M Lib/profiling/sampling/collector.py
M Lib/profiling/sampling/gecko_collector.py
M Lib/profiling/sampling/heatmap_collector.py
M Lib/profiling/sampling/pstats_collector.py
M Lib/profiling/sampling/sample.py
M Lib/profiling/sampling/stack_collector.py
M Lib/test/test_profiling/test_heatmap.py
M Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
diff --git a/Lib/profiling/sampling/collector.py
b/Lib/profiling/sampling/collector.py
index 81ec6344ebdea4..8e0f0c44c4f8f3 100644
--- a/Lib/profiling/sampling/collector.py
+++ b/Lib/profiling/sampling/collector.py
@@ -143,6 +143,8 @@ def iter_async_frames(awaited_info_list):
class Collector(ABC):
+ aggregating = False
+
@abstractmethod
def collect(self, stack_frames, timestamps_us=None):
"""Collect profiling data from stack frames.
diff --git a/Lib/profiling/sampling/gecko_collector.py
b/Lib/profiling/sampling/gecko_collector.py
index 8986194268b3ce..54392af9500008 100644
--- a/Lib/profiling/sampling/gecko_collector.py
+++ b/Lib/profiling/sampling/gecko_collector.py
@@ -63,6 +63,8 @@
class GeckoCollector(Collector):
+ aggregating = True
+
def __init__(self, sample_interval_usec, *, skip_idle=False,
opcodes=False):
self.sample_interval_usec = sample_interval_usec
self.skip_idle = skip_idle
diff --git a/Lib/profiling/sampling/heatmap_collector.py
b/Lib/profiling/sampling/heatmap_collector.py
index 5c36d78f5535e7..6e650ec08f410b 100644
--- a/Lib/profiling/sampling/heatmap_collector.py
+++ b/Lib/profiling/sampling/heatmap_collector.py
@@ -452,7 +452,8 @@ def process_frames(self, frames, thread_id, weight=1):
next_lineno = extract_lineno(next_frame[1])
self._record_call_relationship(
(filename, lineno, funcname),
- (next_frame[0], next_lineno, next_frame[2])
+ (next_frame[0], next_lineno, next_frame[2]),
+ weight=weight,
)
def _is_valid_frame(self, filename, lineno):
@@ -561,7 +562,7 @@ def _get_bytecode_data_for_line(self, filename, lineno):
result.sort(key=lambda x: (-x['samples'], x['opcode']))
return result
- def _record_call_relationship(self, callee_frame, caller_frame):
+ def _record_call_relationship(self, callee_frame, caller_frame, weight=1):
"""Record caller/callee relationship between adjacent frames."""
callee_filename, callee_lineno, callee_funcname = callee_frame
caller_filename, caller_lineno, caller_funcname = caller_frame
@@ -587,7 +588,7 @@ def _record_call_relationship(self, callee_frame,
caller_frame):
# Count this call edge for path analysis
edge_key = (caller_key, callee_key)
- self.edge_samples[edge_key] += 1
+ self.edge_samples[edge_key] += weight
def export(self, output_path):
"""Export heatmap data as HTML files in a directory.
diff --git a/Lib/profiling/sampling/pstats_collector.py
b/Lib/profiling/sampling/pstats_collector.py
index 50500296c15acc..43b1daf2a119d4 100644
--- a/Lib/profiling/sampling/pstats_collector.py
+++ b/Lib/profiling/sampling/pstats_collector.py
@@ -8,6 +8,8 @@
class PstatsCollector(Collector):
+ aggregating = True
+
def __init__(self, sample_interval_usec, *, skip_idle=False):
self.result = collections.defaultdict(
lambda: dict(total_rec_calls=0, direct_calls=0, cumulative_calls=0)
diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py
index 5bbe2483581333..b9e7e2625d09e4 100644
--- a/Lib/profiling/sampling/sample.py
+++ b/Lib/profiling/sampling/sample.py
@@ -47,6 +47,9 @@ def _pause_threads(unwinder, blocking):
# If fewer samples are collected, we skip the TUI and just print a message
MIN_SAMPLES_FOR_TUI = 200
+# Maximum number of consecutive identical samples to keep before flushing.
+MAX_PENDING_SAMPLES = 8192
+
class SampleProfiler:
def __init__(self, pid, sample_interval_usec, all_threads, *,
mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False,
skip_non_matching_threads=True, collect_stats=False, blocking=False):
self.pid = pid
@@ -109,6 +112,20 @@ def sample(self, collector, duration_sec=None, *,
async_aware=False):
last_sample_time = start_time
realtime_update_interval = 1.0 # Update every second
last_realtime_update = start_time
+ aggregating = getattr(collector, 'aggregating', False) is True
+ prev_stack = None
+ pending_count = 0
+ pending_timestamps = [] if aggregating else None
+
+ def flush_pending():
+ nonlocal pending_count, pending_timestamps
+ if pending_count == 0:
+ return
+ pending_count = 0
+ ts = pending_timestamps
+ pending_timestamps = []
+ collector.collect(prev_stack, timestamps_us=ts)
+
try:
while duration_sec is None or running_time_sec < duration_sec:
# Check if live collector wants to stop
@@ -116,6 +133,7 @@ def sample(self, collector, duration_sec=None, *,
async_aware=False):
break
current_time = time.perf_counter()
+ current_time_us = int(current_time * 1_000_000)
if next_time > current_time:
sleep_time = (next_time - current_time) * 0.9
if sleep_time > 0.0001:
@@ -125,13 +143,24 @@ def sample(self, collector, duration_sec=None, *,
async_aware=False):
stack_frames = self._get_stack_trace(
async_aware=async_aware
)
- collector.collect(stack_frames)
+ if aggregating:
+ if stack_frames != prev_stack:
+ flush_pending()
+ prev_stack = stack_frames
+ pending_count += 1
+ pending_timestamps.append(current_time_us)
+ if pending_count >= MAX_PENDING_SAMPLES:
+ flush_pending()
+ else:
+ collector.collect(stack_frames)
except ProcessLookupError as e:
running_time_sec = current_time - start_time
break
except (RuntimeError, UnicodeDecodeError, MemoryError,
OSError):
+ flush_pending()
collector.collect_failed_sample()
errors += 1
+ prev_stack = None
except Exception as e:
if not _is_process_running(self.pid):
break
@@ -163,6 +192,8 @@ def sample(self, collector, duration_sec=None, *,
async_aware=False):
interrupted = True
running_time_sec = time.perf_counter() - start_time
print("Interrupted by user.")
+ finally:
+ flush_pending()
# Clear real-time stats line if it was being displayed
if self.realtime_stats and len(self.sample_intervals) > 0:
diff --git a/Lib/profiling/sampling/stack_collector.py
b/Lib/profiling/sampling/stack_collector.py
index 60df026ed76a6c..42281dc6454c83 100644
--- a/Lib/profiling/sampling/stack_collector.py
+++ b/Lib/profiling/sampling/stack_collector.py
@@ -16,6 +16,8 @@
class StackTraceCollector(Collector):
+ aggregating = True
+
def __init__(self, sample_interval_usec, *, skip_idle=False):
self.sample_interval_usec = sample_interval_usec
self.skip_idle = skip_idle
diff --git a/Lib/test/test_profiling/test_heatmap.py
b/Lib/test/test_profiling/test_heatmap.py
index b2acb1cf577341..ee27fdd3fa3053 100644
--- a/Lib/test/test_profiling/test_heatmap.py
+++ b/Lib/test/test_profiling/test_heatmap.py
@@ -345,6 +345,21 @@ def test_process_frames_tracks_edge_samples(self):
# Check that edge count is tracked
self.assertGreater(len(collector.edge_samples), 0)
+ def test_process_frames_weight_applies_to_identical_samples(self):
+ collector = HeatmapCollector(sample_interval_usec=100)
+
+ frames = [
+ ('callee.py', (5, 5, -1, -1), 'callee', None),
+ ('caller.py', (10, 10, -1, -1), 'caller', None),
+ ]
+
+ collector.process_frames(frames, thread_id=1, weight=5)
+
+ edge_key = (('caller.py', 10), ('callee.py', 5))
+ self.assertEqual(collector.edge_samples[edge_key], 5)
+ self.assertEqual(collector.line_samples[('callee.py', 5)], 5)
+ self.assertEqual(collector.line_samples[('caller.py', 10)], 5)
+
def test_process_frames_handles_empty_frames(self):
"""Test that process_frames handles empty frame list."""
collector = HeatmapCollector(sample_interval_usec=100)
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
index 68bc59a5414a05..2f5a5e27328659 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
@@ -198,8 +198,83 @@ def test_sample_profiler_sample_method_timing(self):
self.assertIn("samples", result)
# Verify collector was called multiple times
- self.assertGreaterEqual(mock_collector.collect.call_count, 5)
- self.assertLessEqual(mock_collector.collect.call_count, 11)
+ total_weight = sum(
+ len(c.kwargs.get("timestamps_us") or [None])
+ for c in mock_collector.collect.call_args_list
+ )
+ self.assertGreaterEqual(total_weight, 5)
+ self.assertLessEqual(total_weight, 11)
+
+ def test_sample_profiler_does_not_buffer_non_aggregating_collectors(self):
+ """Test that non-aggregating collectors get each sample immediately."""
+
+ stack_frames = [mock.sentinel.stack_frames]
+ mock_collector = mock.MagicMock()
+ mock_collector.aggregating = False
+
+ with self._patched_unwinder() as u:
+ u.instance.get_stack_trace.return_value = stack_frames
+
+ manager = mock.Mock()
+ manager.attach_mock(u.instance.get_stack_trace, "unwind")
+ manager.attach_mock(mock_collector.collect, "collect")
+
+ profiler = SampleProfiler(
+ pid=12345, sample_interval_usec=10000, all_threads=False
+ )
+
+ times = [0.0, 0.01, 0.011, 0.02, 0.03]
+ with mock.patch("time.perf_counter", side_effect=times):
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ profiler.sample(mock_collector, duration_sec=0.025)
+
+ self.assertEqual(
+ manager.mock_calls,
+ [
+ mock.call.unwind(),
+ mock.call.collect(stack_frames),
+ mock.call.unwind(),
+ mock.call.collect(stack_frames),
+ ],
+ )
+
+ def test_sample_profiler_flushes_aggregated_batches_at_limit(self):
+ """Test that aggregating collectors flush after MAX_PENDING_SAMPLES
samples."""
+
+ stack_frames = [mock.sentinel.stack_frames]
+ mock_collector = mock.MagicMock()
+ mock_collector.aggregating = True
+
+ with self._patched_unwinder() as u:
+ u.instance.get_stack_trace.return_value = stack_frames
+
+ profiler = SampleProfiler(
+ pid=12345, sample_interval_usec=10000, all_threads=False
+ )
+
+ times = [
+ 0.0,
+ 0.01, 0.011,
+ 0.02, 0.021,
+ 0.03, 0.031,
+ 0.04, 0.041,
+ 0.05, 0.051,
+ ]
+ with mock.patch("profiling.sampling.sample.MAX_PENDING_SAMPLES",
2):
+ with mock.patch("time.perf_counter", side_effect=times):
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ profiler.sample(mock_collector, duration_sec=0.045)
+
+ batches = [
+ (c.args[0], len(c.kwargs["timestamps_us"]))
+ for c in mock_collector.collect.call_args_list
+ ]
+ self.assertEqual(
+ batches,
+ [(stack_frames, 2), (stack_frames, 2), (stack_frames, 1)],
+ )
def test_sample_profiler_error_handling(self):
"""Test that the sample method handles errors gracefully."""
diff --git
a/Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst
b/Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst
new file mode 100644
index 00000000000000..25344e5a90f022
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst
@@ -0,0 +1,4 @@
+Coalesce consecutive identical stack frames in Tachyon, so aggregating
+collectors (pstats, collapsed, flamegraph, gecko) receive one collect.
+Improves sample rate 3x, error rate and missed rate drop by 70%. Patch by
+Maurycy Pawłowski-Wieroński.
_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]