Thanks, I get this error when I switched to s3a:// Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <palw...@hortonworks.com> wrote: > Spark cannot read locally from S3 without an S3a protocol; you’ll more > than likely need a local copy of the data or you’ll need to utilize the > proper jars to enable S3 communication from the edge to the datacenter. > > > > > https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark > > > > Here are the jars: > https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws > > > > Looks like you already have them, in which case you’ll have to make small > configuration changes, e.g. s3 à s3a > > > > Keep in mind: *The Amazon JARs have proven very brittle: the version of > the Amazon libraries must match the versions against which the Hadoop > binaries were built.* > > > > > https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client > > > > > > > > > > *From: *Toy <noppani...@gmail.com> > *Date: *Tuesday, January 23, 2018 at 11:33 AM > *To: *"user@spark.apache.org" <user@spark.apache.org> > *Subject: *I can't save DataFrame from running Spark locally > > > > Hi, > > > > First of all, my Spark application runs fine in AWS EMR. However, I'm > trying to run it locally to debug some issue. My application is just to > parse log files and convert to DataFrame then convert to ORC and save to > S3. However, when I run locally I get this error > > > > java.io.IOException: /orc/dt=2018-01-23 doesn't exist > > at > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170) > > at > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:497) > > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > > at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source) > > at > org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340) > > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) > > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77) > > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) > > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) > > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) > > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) > > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) > > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) > > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) > > at > org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492) > > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) > > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) > > at Vivace$$anonfun$processStream$1.apply(vivace.scala:193) > > at Vivace$$anonfun$processStream$1.apply(vivace.scala:170) > > > > Here's what I have in sbt > > > > scalaVersion := "2.11.8" > > > > val sparkVersion = "2.1.0" > > val hadoopVersion = "2.7.3" > > val awsVersion = "1.11.155" > > > > lazy val sparkAndDependencies = Seq( > > "org.apache.spark" %% "spark-core" % sparkVersion, > > "org.apache.spark" %% "spark-sql" % sparkVersion, > > "org.apache.spark" %% "spark-hive" % sparkVersion, > > "org.apache.spark" %% "spark-streaming" % sparkVersion, > > > > "org.apache.hadoop" % "hadoop-aws" % hadoopVersion, > > "org.apache.hadoop" % "hadoop-common" % hadoopVersion > > ) > > > > And this is where the code failed > > > > val sparrowWriter = > sparrowCastedDf.write.mode("append").format("orc").option("compression", > "zlib") > > sparrowWriter.save(sparrowOutputPath) > > > > sparrowOutputPath is something like s3://bucket/folder and it exists I > checked it with aws command line > > > > I put a breakpoint there and the full path looks like this > s3://bucket/orc/dt=2018-01-23 which exists. > > > > I have also set up the credentials like this > > > > sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key") > > sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret") > > > > My confusion is this code runs fine in the cluster but I get this error > running locally. > > > > >