Hello everyone. In fact, the problem was coming from FileSystem.get() : ### val fs = FileSystem.get(hadoopConfig) ###
When you want to interact with S3, you need to add a first parameter, before the hadoop config, to specify the filesystem. Something like this : ### val s3uri = URI.create("s3a://mybucket") val fs = FileSystem.get(s3uri, hadoopConfig) ### Best regards. [Logo Orange]<http://www.orange.com/> Gwenael Le Barzic De : LE BARZIC Gwenael DTSI/SI Envoyé : jeudi 11 juillet 2024 16:24 À : user@flink.apache.org Objet : Trying to read a file from S3 with flink on kubernetes Hey guys. I'm trying to read a file from an internal S3 with flink on Kubernetes, but get a strange blocking error. Here is the code : MyFlinkJob.scala : ### package com.example.flink import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import scala.io.Source object MyFlinkJob { def main(args: Array[String]): Unit = { try { val env = StreamExecutionEnvironment.getExecutionEnvironment val hadoopConfig = new Configuration() hadoopConfig.set("fs.s3a.access.key", "###") hadoopConfig.set("fs.s3a.secret.key", "###") hadoopConfig.set("fs.s3a.endpoint", "internal endpoint") val fs = FileSystem.get(hadoopConfig) val s3Path = new Path("s3a://mybucket/myfolder/myfile.txt") val inputStream = fs.open(s3Path) val referenceData = Source.fromInputStream(inputStream).getLines().toSet inputStream.close() println("Reference Data:") referenceData.foreach(println) env.execute("Flink S3 Simple Example") } catch { case e: Exception => e.printStackTrace() println(s"Error: ${e.getMessage}") } } } ### And my build.sbt file : ### import Dependencies._ name := "MyFlinkJob" version := "0.1" scalaVersion := "2.12.19" ThisBuild / scalaVersion := "2.12.19" ThisBuild / version := "0.1.0-SNAPSHOT" ThisBuild / organization := "com.example" ThisBuild / organizationName := "example" libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % "1.18.1", "org.apache.flink" %% "flink-streaming-scala" % "1.18.1", "org.apache.flink" % "flink-s3-fs-hadoop" % "1.18.1" ) assembly / assemblyOption ~= { _.withIncludeScala(false) } assembly / mainClass := Some(s"com.example.flink.MyFlinkJob") assembly / assemblyJarName := s"myflinkjob_2.12-0.1.jar" assembly / assemblyMergeStrategy := { case path if path.contains("services") => MergeStrategy.concat case PathList("META-INF", _*) => MergeStrategy.discard case _ => MergeStrategy.first } ### I'm using the following docker image : ### FROM flink:1.18-scala_2.12 USER root RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \ cp -p /opt/flink/opt/flink-s3-fs-hadoop-1.18.1.jar /opt/flink/plugins/s3-fs-hadoop/ RUN mkdir -p /opt/flink/log/ /opt/flink/conf /opt/flink/userlib WORKDIR /opt/flink/userlib COPY target/scala-2.12/myflinkjob_2.12-0.1.jar myflinkjob.jar RUN chown -R flink:flink /opt/flink && \ chmod -R 755 /opt/flink RUN chown -R flink:flink /opt/flink/userlib && \ chmod -R 755 /opt/flink/userlib ### And the following Kubernetes deployment : ### --- apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: flink-s3 spec: image: flink-s3:0.1 flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" classloader.resolve-order: parent-first serviceAccount: flink jobManager: resource: memory: 2048m cpu: 0.5 taskManager: replicas: 2 resource: memory: 2048m cpu: 0.5 job: jarURI: "local:///opt/flink/userlib/myflinkjob.jar" parallelism: 2 #upgradeMode: stateless # stateless or savepoint or last-state entryClass: "com.example.flink.MyFlinkJob" args: [] podTemplate: apiVersion: v1 kind: Pod metadata: name: flink-s3 spec: containers: - name: flink-main-container securityContext: runAsUser: 9999 # UID of a non-root user runAsNonRoot: true env: [] volumeMounts: [] volumes: [] ### I launch the flink job like this : ### kubectl apply -f kubernetes/FlinkDeployment.yml ### I am using Flink operator on Kubernetes. And I get this error in the logs : ### java.lang.IllegalArgumentException: Wrong FS: s3a://mybucket/myfolder/myfile.txt, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:807) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:105) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:774) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976) at com.orange.flink.MyFlinkJob$.main(MyFlinkJob.scala:28) at com.orange.flink.MyFlinkJob.main(MyFlinkJob.scala) ### I don't really understand why flink is telling me this. I followed the documentation here : https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins May someone help me to understand what I am missing ? Cdt. Gwenael Le Barzic Orange Restricted Orange Restricted ____________________________________________________________________________________________________________ Ce message et ses pieces jointes peuvent contenir des informations confidentielles ou privilegiees et ne doivent donc pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce message par erreur, veuillez le signaler a l'expediteur et le detruire ainsi que les pieces jointes. Les messages electroniques etant susceptibles d'alteration, Orange decline toute responsabilite si ce message a ete altere, deforme ou falsifie. Merci. This message and its attachments may contain confidential or privileged information that may be protected by law; they should not be distributed, used or copied without authorisation. If you have received this email in error, please notify the sender and delete this message and its attachments. As emails may be altered, Orange is not liable for messages that have been modified, changed or falsified. Thank you.