Hi, We use GCS as storage, and have the following functions to list files in GCS path for Flink batch mode to buidl states:
def listPath(p: String): Seq[String] = { val path = new Path(p) val fs = path.getFileSystem(new Configuration()) fs.listStatus(path) match { case null => Seq() case xs => xs.map(_.getPath.toString) } } This function works fine in Flink 1.14. However, in Flink 1.15, we have the following exception: Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app') at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at <redacted>.listPath(<redacted>) ~[?:?] We found a similar issue in Spark [0]. However, we are not sure if it is related, and if it is, how can we apply this fix. Any help is welcome. [0] https://issues.apache.org/jira/browse/SPARK-9206 -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B
signature.asc
Description: PGP signature