[ https://issues.apache.org/jira/browse/FLINK-29314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Satyasheel Satyasheel updated FLINK-29314: ------------------------------------------ Description: While using *FileSource.forRecordStreamFormat* api with a build option of *monitorContinuously(Duration.ofMillis(10000))* for avro files on S3 we are getting {code:java} akka.pattern.AskTimeoutException: Ask timed out on Actor[akka://flink/user/rpc/dispatcher_2#-1751288098] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. {code} error when we have more than 2 partition in a s3 bucket. More context. For a given S3 uri: `s3://-export-staging/data-export/dataexport.prod-10.S3.integration.dd33/event_type=users.messages.email.Send/` There are two partition of avro based on date. {code:java} date=2022-09-12-10/ date=2022-09-12-11/ {code} Reading of avro file perfectly fine. While the job is running when another partition is added to the uri it works perfectly fine then aw well. Now when I stop the job and re-run the job it fails with below exception: {code:java} Exception in thread "main" org.apache.flink.util.FlinkException: Failed to execute job 'data-export'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) at com.lightricks.sigma.topologies.EntryPoint$.delayedEndpoint$com$lightricks$sigma$topologies$EntryPoint$1(EntryPoint.scala:75) at com.lightricks.sigma.topologies.EntryPoint$delayedInit$body.apply(EntryPoint.scala:31) at scala.Function0.apply$mcV$sp(Function0.scala:39) at scala.Function0.apply$mcV$sp$(Function0.scala:39) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17) at scala.App.$anonfun$main$1$adapted(App.scala:80) at scala.collection.immutable.List.foreach(List.scala:392) at scala.App.main(App.scala:80) at scala.App.main$(App.scala:78) at com.lightricks.sigma.topologies.EntryPoint$.main(EntryPoint.scala:31) at com.lightricks.sigma.topologies.EntryPoint.main(EntryPoint.scala) Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160) at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$submitJob$2(PerJobMiniClusterFactory.java:83) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:89) at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144) ... 9 more Caused by: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out. at com.sun.proxy.$Proxy13.requestJobStatus(Unknown Source) at org.apache.flink.runtime.minicluster.MiniCluster.lambda$getJobStatus$6(MiniCluster.java:704) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:751) at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:703) at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:86) ... 10 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/dispatcher_2#-1751288098]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635) at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:874) at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113) at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:872) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) at java.lang.Thread.run(Thread.java:748) {code} The weird part is that when partition is increased from 2 to 3 it fails with akka ask timeout. was: While using `FileSource.forRecordStreamFormat` api with a build option of `monitorContinuously(Duration.ofMillis(10000))` for avro files on S3 we are getting `akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/dispatcher_2#-1751288098]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.` error when we have more than 2 partition in a s3 bucket. More context. For a given S3 uri: `s3://-export-staging/data-export/dataexport.prod-10.S3.integration.dd33/event_type=users.messages.email.Send/` There are two partition of avro based on date. {code:java} date=2022-09-12-10/ date=2022-09-12-11/ {code} Reading of avro file perfectly fine. While the job is running when another partition is added to the uri it works perfectly fine then aw well. Now when I stop the job and re-run the job it fails with below exception: {code:java} Exception in thread "main" org.apache.flink.util.FlinkException: Failed to execute job 'data-export'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) at com.lightricks.sigma.topologies.EntryPoint$.delayedEndpoint$com$lightricks$sigma$topologies$EntryPoint$1(EntryPoint.scala:75) at com.lightricks.sigma.topologies.EntryPoint$delayedInit$body.apply(EntryPoint.scala:31) at scala.Function0.apply$mcV$sp(Function0.scala:39) at scala.Function0.apply$mcV$sp$(Function0.scala:39) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17) at scala.App.$anonfun$main$1$adapted(App.scala:80) at scala.collection.immutable.List.foreach(List.scala:392) at scala.App.main(App.scala:80) at scala.App.main$(App.scala:78) at com.lightricks.sigma.topologies.EntryPoint$.main(EntryPoint.scala:31) at com.lightricks.sigma.topologies.EntryPoint.main(EntryPoint.scala) Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160) at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$submitJob$2(PerJobMiniClusterFactory.java:83) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:89) at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144) ... 9 more Caused by: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out. at com.sun.proxy.$Proxy13.requestJobStatus(Unknown Source) at org.apache.flink.runtime.minicluster.MiniCluster.lambda$getJobStatus$6(MiniCluster.java:704) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:751) at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:703) at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:86) ... 10 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/dispatcher_2#-1751288098]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635) at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:874) at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113) at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:872) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) at java.lang.Thread.run(Thread.java:748) {code} The weird part is that when partition is increased from 2 to 3 it fails with akka ask timeout. > AskTimeoutException for more than two partition with FileSource API on S3 > ------------------------------------------------------------------------- > > Key: FLINK-29314 > URL: https://issues.apache.org/jira/browse/FLINK-29314 > Project: Flink > Issue Type: Bug > Components: FileSystems > Affects Versions: 1.13.5 > Environment: The code I am using is: > {code:java} > object AvroStreamFormat extends SimpleStreamFormat[Send] { > override def createReader(config: Configuration, stream: > FSDataInputStream): StreamFormat.Reader[Send] = { > val schema = AvroSchema[Send] > val reader: AvroInputStream[Send] = > AvroInputStream.data[Send].from(stream).build(schema) > new StreamFormat.Reader[Send] { > override def read(): Send = { > if (reader.iterator.hasNext) { > reader.iterator.next() > } else { > null > } > } > override def close(): Unit = reader.close() > } > } > override def getProducedType: TypeInformation[Send] = > TypeInformation.of(classOf[Send]) > } > val source = FileSource.forRecordStreamFormat( > AvroStreamFormat, > new Path(s3Path)) > .monitorContinuously(Duration.ofMillis(10000)) > .build() > val recordStream: DataStream[Send] = env.fromSource( > source, > WatermarkStrategy.noWatermarks(), > "currents-email-send" > ){code} > Reporter: Satyasheel Satyasheel > Priority: Major > > While using *FileSource.forRecordStreamFormat* api with a build option of > *monitorContinuously(Duration.ofMillis(10000))* for avro files on S3 we are > getting > {code:java} > akka.pattern.AskTimeoutException: Ask timed out on > Actor[akka://flink/user/rpc/dispatcher_2#-1751288098] after [10000 ms]. > Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A > typical reason for `AskTimeoutException` is that the recipient actor didn't > send a reply. {code} > error when we have more than 2 partition in a s3 bucket. > More context. > For a given S3 uri: > `s3://-export-staging/data-export/dataexport.prod-10.S3.integration.dd33/event_type=users.messages.email.Send/` > There are two partition of avro based on date. > {code:java} > date=2022-09-12-10/ > date=2022-09-12-11/ {code} > Reading of avro file perfectly fine. While the job is running when another > partition is added to the uri it works perfectly fine then aw well. Now when > I stop the job and re-run the job it fails with below exception: > {code:java} > Exception in thread "main" org.apache.flink.util.FlinkException: Failed to > execute job 'data-export'. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) > at > com.lightricks.sigma.topologies.EntryPoint$.delayedEndpoint$com$lightricks$sigma$topologies$EntryPoint$1(EntryPoint.scala:75) > at > com.lightricks.sigma.topologies.EntryPoint$delayedInit$body.apply(EntryPoint.scala:31) > at scala.Function0.apply$mcV$sp(Function0.scala:39) > at scala.Function0.apply$mcV$sp$(Function0.scala:39) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17) > at scala.App.$anonfun$main$1$adapted(App.scala:80) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.App.main(App.scala:80) > at scala.App.main$(App.scala:78) > at com.lightricks.sigma.topologies.EntryPoint$.main(EntryPoint.scala:31) > at com.lightricks.sigma.topologies.EntryPoint.main(EntryPoint.scala) > Caused by: java.lang.RuntimeException: Error while waiting for job to be > initialized > at > org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160) > at > org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$submitJob$2(PerJobMiniClusterFactory.java:83) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > Caused by: java.util.concurrent.ExecutionException: > java.util.concurrent.TimeoutException: Invocation of public default > java.util.concurrent.CompletableFuture > org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) > timed out. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:89) > at > org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144) > ... 9 more > Caused by: java.util.concurrent.TimeoutException: Invocation of public > default java.util.concurrent.CompletableFuture > org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) > timed out. > at com.sun.proxy.$Proxy13.requestJobStatus(Unknown Source) > at > org.apache.flink.runtime.minicluster.MiniCluster.lambda$getJobStatus$6(MiniCluster.java:704) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) > at > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) > at > org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:751) > at > org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:703) > at > org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:86) > ... 10 more > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/rpc/dispatcher_2#-1751288098]] after [10000 ms]. > Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A > typical reason for `AskTimeoutException` is that the recipient actor didn't > send a reply. > at > akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:874) > at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113) > at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:872) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at > akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at > akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) > at > akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) > at java.lang.Thread.run(Thread.java:748) {code} > The weird part is that when partition is increased from 2 to 3 it fails with > akka ask timeout. -- This message was sent by Atlassian Jira (v8.20.10#820010)