I believe the change between Flink 1.14 and Flink 1.15 has been the addition of a RecoverableWriter for GCS [1]
Perhaps this is the reason for this failure? Best regards, Martijn [1] https://issues.apache.org/jira/browse/FLINK-11838 Op do 2 jun. 2022 om 12:24 schreef Qingsheng Ren <renqs...@gmail.com>: > Thanks for the input ChangZhuo. > > Could you check if the configuration "classloader.resolve-order” is > set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any > changes related to the user code classloader in Flink 1.15. If my > assumption is correct, you package the gcs-connector into your job JAR > but the Hadoop FS dependencies are not included, so > org.apache.hadoop.fs.FileSystem is loaded by app classloader from > flink-s3-fs-hadoop.jar under the lib of Flink, but > GoogleHadoopFileSystem is loaded by user code classloader from job > JAR. Setting the resolve order to "parent-first" could bypass the > issue [1] so I assume you have this config in 1.14 but not in 1.15. > Please forgive me if I understand incorrectly! > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/ > > On Thu, Jun 2, 2022 at 11:22 AM ChangZhuo Chen (陳昌倬) <czc...@czchen.org> > wrote: > > > > On Thu, Jun 02, 2022 at 11:17:19AM +0800, Qingsheng Ren wrote: > > > Hi ChangZhuo, > > > > > > I assume it’s a classloading issue but I can’t track down to the root > cause in code. Would you mind sharing the entire exception stack and some > JM/TM logs related to file system? > > > > The following is exception log we have. Please let us know if you need > > other logs. > > > > ps. <redacted>.listPath(<redacted>) is the function I mentioned earlier. > > > > > > 2022-06-02 00:25:57,825 WARN > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application failed unexpectedly: > > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > > at > java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) > ~[?:?] > > at > java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) > ~[?:?] > > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) > ~[?:?] > > at > java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] > > at > java.util.concurrent.CompletableFuture.completeExceptionally(Unknown > Source) ~[?:?] > > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] > > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > > at > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) > ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) > ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) > [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) > [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > [?:?] > > at > java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) > [?:?] > > at java.util.concurrent.ForkJoinPool.scan(Unknown Source) > [?:?] > > at java.util.concurrent.ForkJoinPool.runWorker(Unknown > Source) [?:?] > > at java.util.concurrent.ForkJoinWorkerThread.run(Unknown > Source) [?:?] > > Caused by: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > > ... 14 more > > Caused by: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: class > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to > class org.apache.hadoop.fs.FileSystem > (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and > org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app') > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) > ~[flink-dist-1.15.0.jar:1.15.0] > > ... 13 more > > Caused by: java.lang.ClassCastException: class > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to > class org.apache.hadoop.fs.FileSystem > (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and > org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app') > > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at > org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at > org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > <redacted>.listPath(<redacted>) ~[?:?] > > at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] > > at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?] > > at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > ~[?:?] > > at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) > ~[flink-dist-1.15.0.jar:1.15.0] > > ... 13 more > > 2022-06-02 00:25:57,828 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal > error occurred in the cluster entrypoint. > > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > > at > java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) > ~[?:?] > > at > java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) > ~[?:?] > > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) > ~[?:?] > > at > java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] > > at > java.util.concurrent.CompletableFuture.completeExceptionally(Unknown > Source) ~[?:?] > > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] > > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > > at > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) > ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) > ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) > [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) > [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0] > > at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > [?:?] > > at > java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) > [?:?] > > at java.util.concurrent.ForkJoinPool.scan(Unknown Source) > [?:?] > > at java.util.concurrent.ForkJoinPool.runWorker(Unknown > Source) [?:?] > > at java.util.concurrent.ForkJoinWorkerThread.run(Unknown > Source) [?:?] > > Caused by: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > > ... 14 more > > Caused by: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: class > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to > class org.apache.hadoop.fs.FileSystem > (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and > org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app') > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) > ~[flink-dist-1.15.0.jar:1.15.0] > > ... 13 more > > Caused by: java.lang.ClassCastException: class > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to > class org.apache.hadoop.fs.FileSystem > (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and > org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app') > > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at > org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at > org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > <redacted>.listPath(<redacted>) ~[?:?] > > at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] > > at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?] > > at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > ~[?:?] > > at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > ~[flink-dist-1.15.0.jar:1.15.0] > > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) > ~[flink-dist-1.15.0.jar:1.15.0] > > ... 13 more > > 2022-06-02 00:25:57,830 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting > KubernetesApplicationClusterEntrypoint down with application status > UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. > > > > > > > > Best regards, > > > > > > Qingsheng > > > > > > > On Jun 2, 2022, at 09:08, ChangZhuo Chen (陳昌倬) <czc...@czchen.org> > wrote: > > > > > > > > Hi, > > > > > > > > We use GCS as storage, and have the following functions to list > files in > > > > GCS path for Flink batch mode to buidl states: > > > > > > > > > > > > def listPath(p: String): Seq[String] = { > > > > val path = new Path(p) > > > > val fs = path.getFileSystem(new Configuration()) > > > > fs.listStatus(path) match { > > > > case null => Seq() > > > > case xs => xs.map(_.getPath.toString) > > > > } > > > > } > > > > > > > > This function works fine in Flink 1.14. However, in Flink 1.15, we > have > > > > the following exception: > > > > > > > > Caused by: java.lang.ClassCastException: class > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to > class org.apache.hadoop.fs.FileSystem > (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and > org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app') > > > > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > > > at > org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > > > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > > > at > org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > > > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > > > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > > > at <redacted>.listPath(<redacted>) ~[?:?] > > > > > > > > We found a similar issue in Spark [0]. However, we are not sure if > it is > > > > related, and if it is, how can we apply this fix. Any help is > welcome. > > > > > > > > > > > > [0] https://issues.apache.org/jira/browse/SPARK-9206 > > > > > > > > > > > > -- > > > > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > > > > http://czchen.info/ > > > > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B > > > > > > > -- > > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > > http://czchen.info/ > > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B >