hi, In order to to solve this you would either need to: - compile from source code (as you mentioned), Flink 1.18.2 is not yet released - change `state.storage.fs.memory-threshold` (could work as a temporary fix in your case until 1.18.2 is released) ``` # The minimum size of state data files. # All state chunks smaller than that are stored inline in the root checkpoint metadata file. # Changing this value is a trade-off between writing data in the metadata file or in the data files, # and requires careful testing under production load to check that checkpointing and savepointing is not impacted. # Default 20kb. The max memory threshold for this configuration is 1MB. state.storage.fs.memory-threshold: 500kb # you can change here to a higher value ``` - enable state compression (what we did) by using: ``` # Tells we should use compression for the state snapshot data # Default is false execution.checkpointing.snapshot-compression: true ``` Hope this helps.
On Thu, Nov 28, 2024 at 3:38 AM 王业楼 <wangye...@yeah.net> wrote: > I saw the related fix commit on github, but now I want to use the 1.18 > fixed version, where should I find it? I see that the 1.18 version provided > by the official website is still in 2023, or can I only compile from the > source code? > > > 在 2024年11月27日,17:56,William Wallace <theanonymous31...@gmail.com> 写道: > > > hi, > It seems similar to issue described here: > https://lists.apache.org/thread/g8yb4rlj0mlf1vgjl71815nts8r1w51p > were we were not able to restore state because of the high number of S3 > reads (in your case it might first encounter the connection limitation > first). > Have a look at https://issues.apache.org/jira/browse/FLINK-36530 and see > if you are affected. > Best regards, > W > > > On Tue, Nov 26, 2024 at 3:56 PM wangye...@yeah.net <wangye...@yeah.net> > wrote: > >> hi >> My Flink cluster uses S3 for storing the state backend. However, an >> exception occurs when the task runs for a long period of time. The content >> of the exception is "Timeout waiting for connection from pool". What could >> be the reason for this? The following is the specific error message. >> >> java.lang.Exception: Exception while creating StreamOperatorStateContext. >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:258) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256) >> at >> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >> at java.base/java.lang.Thread.run(Thread.java:829) >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed >> state backend for >> KeyedProcessOperator_c82105323f1a61713019c6d1da10336c_(1/1) from any of the >> 1 provided restore options. >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:355) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:166) >> ... 11 more >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught >> unexpected exception. >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:407) >> at >> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:513) >> at >> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:100) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:338) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >> ... 13 more >> Caused by: java.io.IOException: com.amazonaws.SdkClientException: Unable to >> execute HTTP request: Timeout waiting for connection from pool >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:156) >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$null$0(RocksDBStateDownloader.java:115) >> at >> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) >> at >> java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> ... 1 more >> Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: >> Timeout waiting for connection from pool >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1219) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1165) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697) >> at >> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561) >> at >> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541) >> at >> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5456) >> at >> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5403) >> at >> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1524) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1102) >> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1099) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1084) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:1077) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$1(PrestoS3FileSystem.java:1021) >> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:1020) >> at >> java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290) >> at >> java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) >> at java.base/java.io.DataInputStream.read(DataInputStream.java:149) >> at >> org.apache.flink.fs.s3presto.common.HadoopDataInputStream.read(HadoopDataInputStream.java:96) >> at java.base/java.io.InputStream.read(InputStream.java:205) >> at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:144) >> ... 6 more >> Suppressed: com.amazonaws.SdkClientException: Unable to execute HTTP >> request: Timeout waiting for connection from pool >> ... 32 more >> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout >> waiting for connection from pool >> at >> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:316) >> at >> org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:282) >> at >> jdk.internal.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) >> at com.amazonaws.http.conn.$Proxy30.get(Unknown Source) >> at >> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190) >> at >> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) >> at >> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) >> at >> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) >> at >> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) >> at >> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1346) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157) >> ... 30 more >> Suppressed: com.amazonaws.SdkClientException: Unable to execute HTTP >> request: Timeout waiting for connection from pool >> ... 32 more >> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout >> waiting for connection from pool >> at >> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:316) >> at >> org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:282) >> at >> jdk.internal.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) >> at com.amazonaws.http.conn.$Proxy30.get(Unknown Source) >> at >> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190) >> at >> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) >> at >> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) >> at >> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) >> at >> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) >> at >> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1346) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157) >> ... 30 more >> Suppressed: com.amazonaws.SdkClientException: Unable to execute HTTP >> request: Timeout waiting for connection from pool >> ... 32 more >> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout >> waiting for connection from pool >> at >> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:316) >> at >> org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:282) >> at >> jdk.internal.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.M >> >> >> >> ------------------------------ >> wangye...@yeah.net >> >