Hello everyone. In fact, the problem was coming from FileSystem.get() : ### val fs = FileSystem.get(hadoopConfig) ###
When you want to interact with S3, you need to add a first parameter, before the hadoop config, to specify the filesystem. Something like this : ### val s3uri = URI.create("s3a://mybucket") val fs = FileSystem.get(s3uri, hadoopConfig) ### Best regards. Gwenael Le Barzic De : LE BARZIC Gwenael DTSI/SI Envoyé : jeudi 11 juillet 2024 16:24 À : user@flink.apache.org Objet : Trying to read a file from S3 with flink on kubernetes Hey guys. I'm trying to read a file from an internal S3 with flink on Kubernetes, but get a strange blocking error. Here is the code : MyFlinkJob.scala : ### package com.example.flink import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import scala.io.Source object MyFlinkJob { def main(args: Array[String]): Unit = { try { val env = StreamExecutionEnvironment.getExecutionEnvironment val hadoopConfig = new Configuration() hadoopConfig.set("fs.s3a.access.key", "###") hadoopConfig.set("fs.s3a.secret.key", "###") hadoopConfig.set("fs.s3a.endpoint", "internal endpoint") val fs = FileSystem.get(hadoopConfig) val s3Path = new Path("s3a://mybucket/myfolder/myfile.txt") val inputStream = fs.open(s3Path) val referenceData = Source.fromInputStream(inputStream).getLines().toSet inputStream.close() println("Reference Data:") referenceData.foreach(println) env.execute("Flink S3 Simple Example") } catch { case e: Exception => e.printStackTrace() println(s"Error: ${e.getMessage}") } } } ### And my build.sbt file : ### import Dependencies._ name := "MyFlinkJob" version := "0.1" scalaVersion := "2.12.19" ThisBuild / scalaVersion := "2.12.19" ThisBuild / version := "0.1.0-SNAPSHOT" ThisBuild / organization := "com.example" ThisBuild / organizationName := "example" libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % "1.18.1", "org.apache.flink" %% "flink-streaming-scala" % "1.18.1", "org.apache.flink" % "flink-s3-fs-hadoop" % "1.18.1" ) assembly / assemblyOption ~= { _.withIncludeScala(false) } assembly / mainClass := Some(s"com.example.flink.MyFlinkJob") assembly / assemblyJarName := s"myflinkjob_2.12-0.1.jar" assembly / assemblyMergeStrategy := { case path if path.contains("services") => MergeStrategy.concat case PathList("META-INF", _*) => MergeStrategy.discard case _ => MergeStrategy.first } ### I'm using the following docker image : ### FROM flink:1.18-scala_2.12 USER root RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \ cp -p /opt/flink/opt/flink-s3-fs-hadoop-1.18.1.jar /opt/flink/plugins/s3-fs-hadoop/ RUN mkdir -p /opt/flink/log/ /opt/flink/conf /opt/flink/userlib WORKDIR /opt/flink/userlib COPY target/scala-2.12/myflinkjob_2.12-0.1.jar myflinkjob.jar RUN chown -R flink:flink /opt/flink && \ chmod -R 755 /opt/flink RUN chown -R flink:flink /opt/flink/userlib && \ chmod -R 755 /opt/flink/userlib ### And the following Kubernetes deployment : ### --- apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: flink-s3 spec: image: flink-s3:0.1 flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" classloader.resolve-order: parent-first serviceAccount: flink jobManager: resource: memory: 2048m cpu: 0.5 taskManager: replicas: 2 resource: memory: 2048m cpu: 0.5 job: jarURI: "local:///opt/flink/userlib/myflinkjob.jar" parallelism: 2 #upgradeMode: stateless # stateless or savepoint or last-state entryClass: "com.example.flink.MyFlinkJob" args: [] podTemplate: apiVersion: v1 kind: Pod metadata: name: flink-s3 spec: containers: - name: flink-main-container securityContext: runAsUser: 9999 # UID of a non-root user runAsNonRoot: true env: [] volumeMounts: [] volumes: [] ### I launch the flink job like this : ### kubectl apply -f kubernetes/FlinkDeployment.yml ### I am using Flink operator on Kubernetes. And I get this error in the logs : ### java.lang.IllegalArgumentException: Wrong FS: s3a://mybucket/myfolder/myfile.txt, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:807) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:105) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:774) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976) at com.orange.flink.MyFlinkJob$.main(MyFlinkJob.scala:28) at com.orange.flink.MyFlinkJob.main(MyFlinkJob.scala) ### I don't really understand why flink is telling me this. I followed the documentation here : https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins May someone help me to understand what I am missing ? Cdt. Gwenael Le Barzic 