[ 
https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407055#comment-17407055
 ] 

ming li commented on FLINK-22483:
---------------------------------

Hi, [~trohrmann], thanks for your reply. It seems that more of the purpose is 
to ensure isolation. I checked the previous issue, we re-created a new 
{{SharedStateRegistry}} to avoid asynchronous cleanup that may cause the 
counter to be less than 1. But if we use the same {{SharedStateRegistry}} and 
do not clear, it seems that there will be no such problem.


In fact, in our production environment, we discard part of the data and state 
to only restart the failed task, but found that it may take several seconds to 
register the {{SharedStateRegistry}} (thousands of tasks and dozens of TB 
states). When there are a large number of task failures at the same time, this 
may take several minutes (number of tasks * several seconds).

 

Therefore, we are considering whether it is possible to reduce the recovery 
time without re-registering the {{SharedStateRegistry}} and without clearing.

> Recover checkpoints when JobMaster gains leadership
> ---------------------------------------------------
>
>                 Key: FLINK-22483
>                 URL: https://issues.apache.org/jira/browse/FLINK-22483
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.13.0
>            Reporter: Robert Metzger
>            Assignee: David Morávek
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> Recovering checkpoints (from the CompletedCheckpointStore) is a potentially 
> long-lasting/blocking operation, for example if the file system 
> implementation is retrying to connect to a unavailable storage backend.
> Currently, we are calling the CompletedCheckpointStore.recover() method from 
> the main thread of the JobManager, making it unresponsive to any RPC call 
> while the recover method is blocked:
> {code}
> 2021-04-02 20:33:31,384 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job XXX 
> switched from state RUNNING to RESTARTING.
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to 
> minio.minio.svc:9000 [minio.minio.svc/XXXX] failed: Connection refused 
> (Connection refused)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) 
> ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) 
> ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) 
> ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
>  ~[?:?]
>       at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
>  ~[?:?]
>       at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
>  ~[?:?]
>       at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) 
> ~[?:1.8.0_282]
>       at XXX.recover(KubernetesHaCheckpointStore.java:69) 
> ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) 
> ~[?:1.8.0_282]
>       at 
> java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
>  ~[?:1.8.0_282]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>  ~[?:1.8.0_282]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> Caused by: org.apache.http.conn.HttpHostConnectException: Connect to 
> minio.minio.svc:9000 [minio.minio.svc/10.115.246.236] failed: Connection 
> refused (Connection refused)
>       at 
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156)
>  ~[?:?]
>       at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>  ~[?:?]
>       at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_282]
>       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>       at 
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>  ~[?:?]
>       at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) 
> ~[?:?]
>       at 
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>  ~[?:?]
>       at 
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
>  ~[?:?]
>       ... 67 more
> Caused by: java.net.ConnectException: Connection refused (Connection refused)
>       at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_282]
>       at 
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) 
> ~[?:1.8.0_282]
>       at 
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>  ~[?:1.8.0_282]
>       at 
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
> ~[?:1.8.0_282]
>       at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
> ~[?:1.8.0_282]
>       at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_282]
>       at 
> org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
>  ~[?:?]
>       at 
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>  ~[?:?]
>       at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>  ~[?:?]
>       at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_282]
>       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>       at 
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>  ~[?:?]
>       at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) 
> ~[?:?]
>       at 
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>  ~[?:?]
>       at 
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
>  ~[?:?]
>       ... 67 more
> {code}
> By moving the recovery to the start of the JobManager (which happens 
> asynchronously after the JobMaster has gained leadership), Flink will remain 
> responsive (reporting a job in INITIALIZING state).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to