Hey Avi, Do you use 'Hadoop S3 plugin' to read from S3?
If yes, what is its version? If not try to read from S3 as follow (ref<https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#amazon-s3>) 1. set an environment variable to use hadoop plugin (it's part of Flink image): key = ENABLE_BUILT_IN_PLUGIN value = flink-s3-fs-hadoop-<flink version>.jar (i.e flink-s3-fs-hadoop-1.11.1.jar, for Flink 1.11.1) 2. read the file from S3: DataSource<String> lines = env.readTextFile("s3://<location>"); Tamir [https://my-email-signature.link/signature.gif?u=1088647&e=139745102&v=e1c175d1aad586ec34f211146023d1e58b49bba775226af52da8148eaa4c27fd] ________________________________ From: Avi Levi <a...@theneura.com> Sent: Saturday, March 6, 2021 6:59 AM To: Chesnay Schepler <ches...@apache.org> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: reading file from s3 EXTERNAL EMAIL Does anyone by any chance have a working example (of course without the credentials etc') that can be shared on github ?simply reading/writing a file from/to s3. I keep on struggling with this one and getting weird exceptions Thanks On Thu, Mar 4, 2021 at 7:30 PM Avi Levi <a...@theneura.com<mailto:a...@theneura.com>> wrote: Sure, This is the full exception stacktrace: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.lang.NumberFormatException: For input string: "64M" at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68) at java.base/java.lang.Long.parseLong(Long.java:707) at java.base/java.lang.Long.parseLong(Long.java:832) at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1563) at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38) at com.neura.ParquetSourceFunction.run(Job.scala:45) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) On Thu, Mar 4, 2021 at 6:02 PM Chesnay Schepler <ches...@apache.org<mailto:ches...@apache.org>> wrote: Can you show us the full exception stacktrace? Intuitively I would think your cluster configuration contains an invalid value for some memory configuration option. On 3/4/2021 4:45 PM, Avi Levi wrote: Hi , I am pretty new. I am keep on struggling to read a file from s3 but getting this weird exception : Caused by: java.lang.NumberFormatException: For input string: "64M" (if anyone can link me to a working github example that will be awesome) . what am i doing wrong? This is how my code looks like this : import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.column.page.PageReadStore import org.apache.parquet.example.data.simple.convert.GroupRecordConverter import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.io.ColumnIOFactory class ParquetSourceFunction extends SourceFunction[String]{ override def run(ctx: SourceFunction.SourceContext[String]): Unit = { val inputPath = "s3a://foo/year=2000/month=02/" val conf = new Configuration() conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf) val readFooter = ParquetFileReader.open(hadoopFile) val metadata = readFooter.getFileMetaData val schema = metadata.getSchema val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns) var pages: PageReadStore = null try { while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) { val rows = pages.getRowCount val columnIO = new ColumnIOFactory().getColumnIO(schema) val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)) (0L until rows).foreach { _ => val group = recordReader.read() val myString = group.getString("field_name", 0) ctx.collect(myString) } } } } override def cancel(): Unit = ??? } object Job { def main(args: Array[String]): Unit = { // set up the execution environment lazy val env = StreamExecutionEnvironment.getExecutionEnvironment lazy val stream = env.addSource(new ParquetSourceFunction) stream.print() env.execute() } } sbt dependencies : ThisBuild / scalaVersion := "2.12.1" val flinkVersion = "1.12.1" val awsSdkVersion = "1.7.4" val hadoopVersion = "2.7.3" val flinkDependencies = Seq( "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided", "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided") "org.apache.flink" %% "flink-parquet" % flinkVersion, "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion) val s3Dependencies = Seq( ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion), ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion) ) val serializationDependencies = Seq( ("org.apache.avro" % "avro" % "1.7.7"), ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"), ("org.apache.parquet" % "parquet-avro" % "1.8.1")) lazy val root = (project in file(".")). settings( libraryDependencies ++= flinkDependencies, libraryDependencies ++= s3Dependencies, libraryDependencies ++= serializationDependencies, libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" , libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1", libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided" ) Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.