MV-GH opened a new issue, #37905: URL: https://github.com/apache/beam/issues/37905
### What needs to happen? ## Context - Java 25 - Dataflow Runner V2 - Protos - streaming I have relative "simple" pipeline, it receives proto messages from pubsub which it stores in a UserBag and when certain conditions happen, it emits an aggregation. I was profiling this (using the GCP profiler as that seems to be only avenue) I saw that around 30% of CPU usage is spent on `org.github.jamm.MemoryMeter.measureDeep` <img width="2548" height="951" alt="Image" src="https://github.com/user-attachments/assets/c98e6010-36ff-4a69-85e8-df217d4eb1f1" /> [profiler_project-tempest_CPU_2026-03-20T16_51_30Z_2026-03-20T17_51_30Z_2026-03-20_10_29_37-14862633681033515490.pb.tar.gz](https://github.com/user-attachments/files/26165661/profiler_project-tempest_CPU_2026-03-20T16_51_30Z_2026-03-20T17_51_30Z_2026-03-20_10_29_37-14862633681033515490.pb.tar.gz) So I looked into it and saw that it's possible to avoid the reflection cost If I implemented "Weighted" ```java /** Returns the amount of memory in bytes the provided object consumes. */ public static long weigh(Object o) { if (o == null) { return REFERENCE_SIZE; } if (o instanceof Weighted) { return ((Weighted) o).getWeight() + REFERENCE_SIZE + 8; } try { return MEMORY_METER.measureDeep(o); } catch (RuntimeException e) { // Checking for RuntimeException since java.lang.reflect.InaccessibleObjectException is only // available starting Java 9 LOG.warn("JVM prevents jamm from accessing subgraph - cache sizes may be underestimated", e); return MEMORY_METER.measure(o); } } ``` Which I did by creating a wrapper object for my Proto which implemented Weighted. Then I profiled it with same workload again. <img width="2548" height="951" alt="Image" src="https://github.com/user-attachments/assets/7b00fd63-9dc6-4105-8a15-e42bb4936bef" /> [profiler_project-tempest_CPU_2026-03-20T18_15_48Z_2026-03-20T18_45_48Z_2026-03-20_11_25_08-9097137418379310132.pb.tar.gz](https://github.com/user-attachments/files/26165692/profiler_project-tempest_CPU_2026-03-20T18_15_48Z_2026-03-20T18_45_48Z_2026-03-20_11_25_08-9097137418379310132.pb.tar.gz) It improved the performance but it didn't eliminate the `measureDeep` calls. That is because when multiple events get processed within the same bundle it tries to "weigh" a list of elements. This list doesn't implement "Weighted" thus the reflection path gets used again. See this code section ```java /** * Appends the newValues to the cached iterable with newWeight weight. If newWeight is negative, * the weight will be calculated using Caches.weigh. */ private void appendHelper(List<T> newValues, long newWeight) { if (newValues.isEmpty()) { return; } Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE); if (existing == null) { return; } // Check to see if we have cached the whole iterable, if not then we must remove it to prevent // returning invalid results as part of a future request. if (existing.getBlocks().get(existing.getBlocks().size() - 1).getNextToken() != null) { cache.remove(IterableCacheKey.INSTANCE); } // Combine all the individual blocks into one block containing all the values since // they were mutated, and we must evict all or none of the blocks. When consuming the blocks, // we must have a reference to all or none of the blocks (which forces a load). List<Block<T>> blocks = existing.getBlocks(); int totalSize = newValues.size(); for (Block<T> block : blocks) { totalSize += block.getValues().size(); } WeightedList<T> allValues = WeightedList.of(new ArrayList<>(totalSize), 0L); for (Block<T> block : blocks) { allValues.addAll(block.getValues(), block.getWeight()); } if (newWeight < 0) { if (newValues.size() == 1) { // Optimize weighing of the common value state as single single-element bag state. newWeight = Caches.weigh(newValues.get(0)); } else { newWeight = Caches.weigh(newValues); } } allValues.addAll(newValues, newWeight); cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues))); } ``` I tried a few other things but they weren't fruitful at eliminating this cost. So my question is, can I avoid this cost? Could some fixes be applied? For example that It would weigh each element individually? or that a "helper" could be supplied to the `StateSpec` so that it would use this to "weigh" instead. Or maybe disable this "weighing" all together? ### Issue Priority Priority: 2 (default / most normal work should be filed as P2) ### Issue Components - [ ] Component: Python SDK - [x] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [x] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
