I believe the change between Flink 1.14 and Flink 1.15 has been the
addition of a RecoverableWriter for GCS [1]

Perhaps this is the reason for this failure?

Best regards, Martijn

[1] https://issues.apache.org/jira/browse/FLINK-11838

Op do 2 jun. 2022 om 12:24 schreef Qingsheng Ren <renqs...@gmail.com>:

> Thanks for the input ChangZhuo.
>
> Could you check if the configuration "classloader.resolve-order” is
> set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any
> changes related to the user code classloader in Flink 1.15. If my
> assumption is correct, you package the gcs-connector into your job JAR
> but the Hadoop FS dependencies are not included, so
> org.apache.hadoop.fs.FileSystem is loaded by app classloader from
> flink-s3-fs-hadoop.jar under the lib of Flink, but
> GoogleHadoopFileSystem is loaded by user code classloader from job
> JAR. Setting the resolve order to "parent-first" could bypass the
> issue [1] so I assume you have this config in 1.14 but not in 1.15.
> Please forgive me if I understand incorrectly!
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>
> On Thu, Jun 2, 2022 at 11:22 AM ChangZhuo Chen (陳昌倬) <czc...@czchen.org>
> wrote:
> >
> > On Thu, Jun 02, 2022 at 11:17:19AM +0800, Qingsheng Ren wrote:
> > > Hi ChangZhuo,
> > >
> > > I assume it’s a classloading issue but I can’t track down to the root
> cause in code. Would you mind sharing the entire exception stack and some
> JM/TM logs related to file system?
> >
> > The following is exception log we have. Please let us know if you need
> > other logs.
> >
> > ps. <redacted>.listPath(<redacted>) is the function I mentioned earlier.
> >
> >
> >     2022-06-02 00:25:57,825 WARN
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
> [] - Application failed unexpectedly:
> >     java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> >             at
> java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source) ~[?:?]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> >             at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> >             at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
> [?:?]
> >             at
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
> [?:?]
> >             at java.util.concurrent.ForkJoinPool.scan(Unknown Source)
> [?:?]
> >             at java.util.concurrent.ForkJoinPool.runWorker(Unknown
> Source) [?:?]
> >             at java.util.concurrent.ForkJoinWorkerThread.run(Unknown
> Source) [?:?]
> >     Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> >             ... 14 more
> >     Caused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >             at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             ... 13 more
> >     Caused by: java.lang.ClassCastException: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >             at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             <redacted>.listPath(<redacted>) ~[?:?]
> >             at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> >             at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
> >             at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> ~[?:?]
> >             at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
> >             at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             ... 13 more
> >     2022-06-02 00:25:57,828 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal
> error occurred in the cluster entrypoint.
> >     java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> >             at
> java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source) ~[?:?]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> >             at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> >             at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
> [?:?]
> >             at
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
> [?:?]
> >             at java.util.concurrent.ForkJoinPool.scan(Unknown Source)
> [?:?]
> >             at java.util.concurrent.ForkJoinPool.runWorker(Unknown
> Source) [?:?]
> >             at java.util.concurrent.ForkJoinWorkerThread.run(Unknown
> Source) [?:?]
> >     Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> >             ... 14 more
> >     Caused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >             at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             ... 13 more
> >     Caused by: java.lang.ClassCastException: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >             at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             <redacted>.listPath(<redacted>) ~[?:?]
> >             at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> >             at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
> >             at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> ~[?:?]
> >             at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
> >             at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             ... 13 more
> >     2022-06-02 00:25:57,830 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting
> KubernetesApplicationClusterEntrypoint down with application status
> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
> >
> > >
> > > Best regards,
> > >
> > > Qingsheng
> > >
> > > > On Jun 2, 2022, at 09:08, ChangZhuo Chen (陳昌倬) <czc...@czchen.org>
> wrote:
> > > >
> > > > Hi,
> > > >
> > > > We use GCS as storage, and have the following functions to list
> files in
> > > > GCS path for Flink batch mode to buidl states:
> > > >
> > > >
> > > >  def listPath(p: String): Seq[String] = {
> > > >    val path = new Path(p)
> > > >    val fs = path.getFileSystem(new Configuration())
> > > >    fs.listStatus(path) match {
> > > >      case null => Seq()
> > > >      case xs => xs.map(_.getPath.toString)
> > > >    }
> > > >  }
> > > >
> > > > This function works fine in Flink 1.14. However, in Flink 1.15, we
> have
> > > > the following exception:
> > > >
> > > >  Caused by: java.lang.ClassCastException: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> > > >          at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at <redacted>.listPath(<redacted>) ~[?:?]
> > > >
> > > > We found a similar issue in Spark [0]. However, we are not sure if
> it is
> > > > related, and if it is, how can we apply this fix. Any help is
> welcome.
> > > >
> > > >
> > > > [0] https://issues.apache.org/jira/browse/SPARK-9206
> > > >
> > > >
> > > > --
> > > > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > > > http://czchen.info/
> > > > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
> > >
> >
> > --
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>

Reply via email to