[ https://issues.apache.org/jira/browse/FLINK-11859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-11859: ----------------------------------- Component/s: Runtime / Network > Improve SpanningRecordSerializer performance by serializing record length to > serialization buffer directly > ---------------------------------------------------------------------------------------------------------- > > Key: FLINK-11859 > URL: https://issues.apache.org/jira/browse/FLINK-11859 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network > Reporter: Yingjie Cao > Assignee: Yingjie Cao > Priority: Major > > In the current implementation of SpanningRecordSerializer, the length of a > record is serialized to an intermediate length buffer and then copied to the > target buffer. Actually, the length filed can be serialized directly to the > data buffer (serializationBuffer), which can avoid the copy of length buffer. > Though the total bytes copied remain unchanged, it one copy of a small record > which incurs high overhead. The flink-benchmarks shows it can improve > performance and the test results are as follows. > Result with the optimization: > |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: > channelsFlushTimeout|Param: stateBackend| > |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2228.049605|77.631804|ops/ms| | | > |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3968.361739|193.501755|ops/ms| | | > |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3030.016702|29.272713|ops/ms| > |MEMORY| > |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2754.77678|26.215395|ops/ms| > |FS| > |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3001.957606|29.288019|ops/ms| > |FS_ASYNC| > |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|123.698984|3.339233|ops/ms| > |ROCKS| > |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|126.252137|1.137735|ops/ms| > |ROCKS_INC| > |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|323.658098|5.855697|ops/ms| > | | > |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|183.34423|3.710787|ops/ms| > | | > |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|404.380233|5.131744|ops/ms| > | | > |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|527.193369|10.176726|ops/ms| > | | > |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|550.073024|11.724412|ops/ms| > | | > |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|564.690627|13.766809|ops/ms| > | | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|49918.11806|2324.234776|ops/ms|100,100ms| > | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10443.63491|315.835962|ops/ms|100,100ms,SSL| > | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|21387.47608|2779.832704|ops/ms|1000,1ms| > | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|26585.85453|860.243347|ops/ms|1000,100ms| > | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8252.563405|947.129028|ops/ms|1000,100ms,SSL| > | > |SumLongsBenchmark.benchmarkCount|thrpt|1|30|8806.021402|263.995836|ops/ms| | > | > |WindowBenchmarks.globalWindow|thrpt|1|30|4573.620126|112.099391|ops/ms| | | > |WindowBenchmarks.sessionWindow|thrpt|1|30|585.246412|7.026569|ops/ms| | | > |WindowBenchmarks.slidingWindow|thrpt|1|30|449.302134|4.123669|ops/ms| | | > |WindowBenchmarks.tumblingWindow|thrpt|1|30|2979.806858|33.818909|ops/ms| | | > |StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.842865|0.13796|ms/op| > | | > Result without the optimization: > > |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: > channelsFlushTimeout|Param: stateBackend| > |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2060.241715|59.898485|ops/ms| | | > |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3645.306819|223.821719|ops/ms| | | > |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2992.698822|36.978115|ops/ms| > |MEMORY| > |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2756.10949|27.798937|ops/ms| > |FS| > |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2965.969876|44.159793|ops/ms| > |FS_ASYNC| > |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|125.506942|1.245978|ops/ms| > |ROCKS| > |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|127.258737|1.190588|ops/ms| > |ROCKS_INC| > |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|316.497954|8.309241|ops/ms| > | | > |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|189.065149|6.302073|ops/ms| > | | > |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|391.51305|7.750728|ops/ms| > | | > |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|513.611151|10.640899|ops/ms| > | | > |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|534.184947|14.370082|ops/ms| > | | > |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|483.388618|19.506723|ops/ms| > | | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|42777.70615|4981.87539|ops/ms|100,100ms| > | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10201.48525|286.248845|ops/ms|100,100ms,SSL| > | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|20788.34364|1146.470652|ops/ms|1000,1ms| > | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|24412.00941|981.98882|ops/ms|1000,100ms| > | > |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8284.336114|177.482373|ops/ms|1000,100ms,SSL| > | > |SumLongsBenchmark.benchmarkCount|thrpt|1|30|7846.800667|127.321584|ops/ms| | > | > |WindowBenchmarks.globalWindow|thrpt|1|30|4837.270101|94.519852|ops/ms| | | > |WindowBenchmarks.sessionWindow|thrpt|1|30|591.304589|5.324132|ops/ms| | | > |WindowBenchmarks.slidingWindow|thrpt|1|30|446.605784|2.53677|ops/ms| | | > |WindowBenchmarks.tumblingWindow|thrpt|1|30|2878.885056|64.035709|ops/ms| | | > |StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.705601|0.164959|ms/op| > | | > > The optimization is especially useful for small records. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)