schlosna commented on PR #24473: URL: https://github.com/apache/flink/pull/24473#issuecomment-1992844641
Thanks for taking a look at this PR. > 1. What's your setup for the path ? We have checkpoints writing to a variety of file systems depending on the infrastructure, so it might be cloud blob storage (e.g. S3 or S3 like) or a local Linux/POSIX filesystem when running on bare metal or a persistent volume claim in kubernetes. > 2. Could you also share the JFR after your optimization ? I do not have a JFR for this running a modified Flink build that I can share, but I created a simple [JMH Benchmark to compare the old vs. new implementations](https://github.com/apache/flink/files/14580149/NormalizeBenchmark.java.txt) that shows a ~5x allocation reduction, as well as a ~4x speedup on Intel & ~3.5x speedup on Apple M1 Pro. ``` Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz VM version: JDK 17.0.10, OpenJDK 64-Bit Server VM, 17.0.10+8-LTS Benchmark Mode Cnt Score Error Units NormalizeBenchmark.newNormalize avgt 5 269.649 ± 23.957 ns/op NormalizeBenchmark.newNormalize:gc.alloc.rate.norm avgt 5 316.800 ± 0.001 B/op NormalizeBenchmark.oldNormalize avgt 5 1119.999 ± 57.073 ns/op NormalizeBenchmark.oldNormalize:gc.alloc.rate.norm avgt 5 1603.200 ± 0.001 B/op ``` ``` 2021 Apple MacBookPro M1 Pro VM version: JDK 17.0.10, OpenJDK 64-Bit Server VM, 17.0.10+7-LTS Benchmark Mode Cnt Score Error Units NormalizeBenchmark.newNormalize avgt 5 167.362 ± 1.396 ns/op NormalizeBenchmark.newNormalize:gc.alloc.rate.norm avgt 5 316.800 ± 0.001 B/op NormalizeBenchmark.oldNormalize avgt 5 598.058 ± 9.701 ns/op NormalizeBenchmark.oldNormalize:gc.alloc.rate.norm avgt 5 1579.200 ± 0.001 B/op ``` Textual details from JFR for a test Flink pipeline where ~1% of all allocations were due to `java.util.regex.Pattern` from `org.apache.flink.core.fs.Path.normalizePath(String):243` via `org.apache.flink.core.fs.Path.initialize(String, String, String)` & `org.apache.flink.core.fs.Path.<init>(String)` constructor: ``` Class Alloc Total Total Allocation (%) ----- --------------- -------------------- int[] 2.468 GiB 4.43237405980632 % Stack Trace Count Percentage ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ----- ---------- java.util.regex.Pattern.compile() 18 21.2 % java.util.regex.Pattern.<init>(String, int) 18 21.2 % java.util.regex.Pattern.compile(String) 17 20 % java.lang.String.replaceAll(String, String) 17 20 % org.apache.flink.core.fs.Path.normalizePath(String) 10 11.8 % org.apache.flink.core.fs.Path.initialize(String, String, String) 10 11.8 % org.apache.flink.core.fs.Path.<init>(String) 5 5.88 % org.apache.flink.core.fs.Path.<init>(Path, String) 5 5.88 % org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStatePath() 4 4.71 % org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle() 4 4.71 % org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.lambda$asyncSnapshot$1(CheckpointStreamFactory, Map, Map, CloseableRegistry) 4 4.71 % org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$$Lambda$2047+0x000000080117e810.1796791918.get(CloseableRegistry) 4 4.71 % org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal() 4 4.71 % org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal() 4 4.71 % org.apache.flink.runtime.state.AsyncSnapshotCallable.call() 4 4.71 % java.util.concurrent.FutureTask.run() 4 4.71 % org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(RunnableFuture) 4 4.71 % org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFutures) 4 4.71 % org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots() 4 4.71 % org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run() 4 4.71 % java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 4 4.71 % java.util.concurrent.ThreadPoolExecutor$Worker.run() 4 4.71 % java.lang.Thread.run() 4 4.71 % org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.createCheckpointDirectory(Path, long) 1 1.18 % org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference) 1 1.18 % org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl$CachingCheckpointStorageWorkerView.lambda$resolveCheckpointStorageLocation$0(long, CheckpointStorageLocationReference, Long) 1 1.18 % org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl$CachingCheckpointStorageWorkerView$$Lambda$2040+0x000000080117c250.901311142.apply(Object) 1 1.18 % java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Object, Function) 1 1.18 % org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl$CachingCheckpointStorageWorkerView.resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference) 1 1.18 % org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(Map, CheckpointMetaData, CheckpointMetricsBuilder, CheckpointOptions, OperatorChain, Supplier) 1 1.18 % org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(CheckpointMetaData, CheckpointOptions, CheckpointMetricsBuilder, OperatorChain, boolean, Supplier) 1 1.18 % org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(CheckpointType, CheckpointMetaData, CheckpointOptions, CheckpointMetricsBuilder) 1 1.18 % org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$2038+0x000000080116b830.1225235430.run() 1 1.18 % org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(ThrowingRunnable) 1 1.18 % org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(CheckpointMetaData, CheckpointOptions, CheckpointMetricsBuilder) 1 1.18 % org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetricsBuilder) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrier) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(CheckpointBarrier) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler, CheckpointBarrier) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(CheckpointBarrier) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(BarrierHandlerState$Controller, CheckpointBarrier) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(BarrierHandlerState$Controller, InputChannelInfo, CheckpointBarrier, boolean) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(InputChannelInfo, CheckpointBarrier, boolean, BarrierHandlerState) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$$Lambda$1883+0x0000000800f0f320.1851091270.apply(Object) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(InputChannelInfo, CheckpointBarrier, FunctionWithException) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(CheckpointBarrier, InputChannelInfo, boolean) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(BufferOrEvent) 1 1.18 % org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext() 1 1.18 % org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput) 1 1.18 % org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput() 1 1.18 % org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(MailboxDefaultAction$Controller) 1 1.18 % org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1201+0x0000000800a8e830.819729682.runDefaultAction(MailboxDefaultAction$Controller) 1 1.18 % org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop() 1 1.18 % org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop() 1 1.18 % org.apache.flink.streaming.runtime.tasks.StreamTask.invoke() 1 1.18 % ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org