schlosna commented on PR #24473:
URL: https://github.com/apache/flink/pull/24473#issuecomment-1992844641

   Thanks for taking a look at this PR.
   
   > 1. What's your setup for the path ?
   
   We have checkpoints writing to a variety of file systems depending on the 
infrastructure, so it might be cloud blob storage (e.g. S3 or S3 like) or a 
local Linux/POSIX filesystem when running on bare metal or a persistent volume 
claim in kubernetes.
   
   > 2. Could you also share the JFR after your optimization ?
   
   I do not have a JFR for this running a modified Flink build that I can 
share, but I created a simple [JMH Benchmark to compare the old vs. new 
implementations](https://github.com/apache/flink/files/14580149/NormalizeBenchmark.java.txt)
 that shows a ~5x allocation reduction, as well as a ~4x speedup on Intel & 
~3.5x speedup on Apple M1 Pro.
   
   
   ```
   Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
   VM version: JDK 17.0.10, OpenJDK 64-Bit Server VM, 17.0.10+8-LTS
   Benchmark                                           Mode  Cnt     Score    
Error   Units
   NormalizeBenchmark.newNormalize                     avgt    5   269.649 ± 
23.957   ns/op
   NormalizeBenchmark.newNormalize:gc.alloc.rate.norm  avgt    5   316.800 ±  
0.001    B/op
   NormalizeBenchmark.oldNormalize                     avgt    5  1119.999 ± 
57.073   ns/op
   NormalizeBenchmark.oldNormalize:gc.alloc.rate.norm  avgt    5  1603.200 ±  
0.001    B/op
   ```
   
   ```
   2021 Apple MacBookPro M1 Pro
   VM version: JDK 17.0.10, OpenJDK 64-Bit Server VM, 17.0.10+7-LTS
   Benchmark                                           Mode  Cnt     Score    
Error   Units
   NormalizeBenchmark.newNormalize                     avgt    5   167.362 ±  
1.396   ns/op
   NormalizeBenchmark.newNormalize:gc.alloc.rate.norm  avgt    5   316.800 ±  
0.001    B/op
   NormalizeBenchmark.oldNormalize                     avgt    5   598.058 ±  
9.701   ns/op
   NormalizeBenchmark.oldNormalize:gc.alloc.rate.norm  avgt    5  1579.200 ±  
0.001    B/op
   ```
   
   Textual details from JFR for a test Flink pipeline where ~1% of all 
allocations were due to `java.util.regex.Pattern` from 
`org.apache.flink.core.fs.Path.normalizePath(String):243` via 
`org.apache.flink.core.fs.Path.initialize(String, String, String)` & 
`org.apache.flink.core.fs.Path.<init>(String)` constructor:
   
   
   ```
   Class  Alloc Total      Total Allocation (%)
   -----  ---------------  --------------------
   int[]  2.468 GiB        4.43237405980632 %
   
   Stack Trace                                                                  
                                                                                
                                                              Count  Percentage
   
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  -----  ----------
   java.util.regex.Pattern.compile()                                            
                                                                                
                                                              18     21.2 %
      java.util.regex.Pattern.<init>(String, int)                               
                                                                                
                                                              18     21.2 %
      java.util.regex.Pattern.compile(String)                                   
                                                                                
                                                              17     20 %
         java.lang.String.replaceAll(String, String)                            
                                                                                
                                                              17     20 %
         org.apache.flink.core.fs.Path.normalizePath(String)                    
                                                                                
                                                              10     11.8 %
            org.apache.flink.core.fs.Path.initialize(String, String, String)    
                                                                                
                                                              10     11.8 %
            org.apache.flink.core.fs.Path.<init>(String)                        
                                                                                
                                                              5      5.88 %
               org.apache.flink.core.fs.Path.<init>(Path, String)               
                                                                                
                                                              5      5.88 %
               
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStatePath()
                                                                                
            4      4.71 %
                  
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle()
                                                                                
       4      4.71 %
                  
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.lambda$asyncSnapshot$1(CheckpointStreamFactory,
 Map, Map, CloseableRegistry)                                                   
  4      4.71 %
                  
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$$Lambda$2047+0x000000080117e810.1796791918.get(CloseableRegistry)
                                                                4      4.71 %
                  
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal()          
                                                                                
                                            4      4.71 %
                  
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal()          
                                                                                
                                            4      4.71 %
                  org.apache.flink.runtime.state.AsyncSnapshotCallable.call()   
                                                                                
                                                              4      4.71 %
                  java.util.concurrent.FutureTask.run()                         
                                                                                
                                                              4      4.71 %
                  
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(RunnableFuture) 
                                                                                
                                            4      4.71 %
                  
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFutures)
                                                                                
                          4      4.71 %
                  
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots()
                                                                                
                             4      4.71 %
                  
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run()          
                                                                                
                                            4      4.71 %
                  
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)    
                                                                                
                                            4      4.71 %
                  java.util.concurrent.ThreadPoolExecutor$Worker.run()          
                                                                                
                                                              4      4.71 %
                  java.lang.Thread.run()                                        
                                                                                
                                                              4      4.71 %
               
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.createCheckpointDirectory(Path,
 long)                                                                          
                    1      1.18 %
                  
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.resolveCheckpointStorageLocation(long,
 CheckpointStorageLocationReference)                                            
                  1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl$CachingCheckpointStorageWorkerView.lambda$resolveCheckpointStorageLocation$0(long,
 CheckpointStorageLocationReference, Long)      1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl$CachingCheckpointStorageWorkerView$$Lambda$2040+0x000000080117c250.901311142.apply(Object)
                                        1      1.18 %
                  
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Object, Function)        
                                                                                
                                            1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl$CachingCheckpointStorageWorkerView.resolveCheckpointStorageLocation(long,
 CheckpointStorageLocationReference)                     1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(Map,
 CheckpointMetaData, CheckpointMetricsBuilder, CheckpointOptions, 
OperatorChain, Supplier)                   1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(CheckpointMetaData,
 CheckpointOptions, CheckpointMetricsBuilder, OperatorChain, boolean, Supplier) 
               1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(CheckpointType,
 CheckpointMetaData, CheckpointOptions, CheckpointMetricsBuilder)               
                             1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$2038+0x000000080116b830.1225235430.run()
                                                                                
                        1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(ThrowingRunnable)
                                                                                
                           1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(CheckpointMetaData,
 CheckpointOptions, CheckpointMetricsBuilder)                                   
                                   1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(CheckpointMetaData,
 CheckpointOptions, CheckpointMetricsBuilder)                                   
                          1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrier)
                                                                                
            1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(CheckpointBarrier)
                                                                                
     1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler,
 CheckpointBarrier)                                                            
1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(CheckpointBarrier)
                                                                1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(BarrierHandlerState$Controller,
 CheckpointBarrier)                                           1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(BarrierHandlerState$Controller,
 InputChannelInfo, CheckpointBarrier, boolean)                        1      
1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(InputChannelInfo,
 CheckpointBarrier, boolean, BarrierHandlerState)                               
1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$$Lambda$1883+0x0000000800f0f320.1851091270.apply(Object)
                                                                 1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(InputChannelInfo,
 CheckpointBarrier, FunctionWithException)                       1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(CheckpointBarrier,
 InputChannelInfo, boolean)                                                     
        1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(BufferOrEvent)
                                                                                
                        1      1.18 %
                  
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext()
                                                                                
                                        1      1.18 %
                  
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput)
                                                                                
             1      1.18 %
                  
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput()    
                                                                                
                                            1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(MailboxDefaultAction$Controller)
                                                                                
                           1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1201+0x0000000800a8e830.819729682.runDefaultAction(MailboxDefaultAction$Controller)
                                                             1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop()
                                                                                
                                          1      1.18 %
                  
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop()            
                                                                                
                                            1      1.18 %
                  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()  
                                                                                
                                                              1      1.18 %
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to