Hi,

We use GCS as storage, and have the following functions to list files in
GCS path for Flink batch mode to buidl states:


  def listPath(p: String): Seq[String] = {
    val path = new Path(p)
    val fs = path.getFileSystem(new Configuration())
    fs.listStatus(path) match {
      case null => Seq()
      case xs => xs.map(_.getPath.toString)
    }
  }

This function works fine in Flink 1.14. However, in Flink 1.15, we have
the following exception:

  Caused by: java.lang.ClassCastException: class 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class 
org.apache.hadoop.fs.FileSystem 
(com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and 
org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
          at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at <redacted>.listPath(<redacted>) ~[?:?]

We found a similar issue in Spark [0]. However, we are not sure if it is
related, and if it is, how can we apply this fix. Any help is welcome.


[0] https://issues.apache.org/jira/browse/SPARK-9206


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Attachment: signature.asc
Description: PGP signature

Reply via email to