Are you writing from an Amazon instance or from a on premise install to S3? How many partitions are you writing from? Maybe you can try to “play” with repartitioning to see how it behaves?
> On Jan 23, 2018, at 17:09, Vasyl Harasymiv <vasyl.harasy...@gmail.com> wrote: > > It is about 400 million rows. S3 automatically chunks the file on their end > while writing, so that's fine, e.g. creates the same file name with > alphanumeric suffixes. > However, the write session expires due to token expiration. > > On Tue, Jan 23, 2018 at 5:03 PM, Jörn Franke <jornfra...@gmail.com > <mailto:jornfra...@gmail.com>> wrote: > How large is the file? > > If it is very large then you should have anyway several partitions for the > output. This is also important in case you need to read again from S3 - > having several files there enables parallel reading. > > On 23. Jan 2018, at 23:58, Vasyl Harasymiv <vasyl.harasy...@gmail.com > <mailto:vasyl.harasy...@gmail.com>> wrote: > >> Hi Spark Community, >> >> Saving a data frame into a file on S3 using: >> >> df.write.csv(s3_location) >> >> If run for longer than 30 mins, the following error persists: >> >> The provided token has expired. (Service: Amazon S3; Status Code: 400; Error >> Code: ExpiredToken;`) >> >> Potentially, because there is a hardcoded session limit in temporary S3 >> connection from Spark. >> >> One can specify the duration as per here: >> >> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html >> >> <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html> >> >> One can, of course, chunk data into sub-30 min writes. However, Is there a >> way to change the token expiry parameter directly in Spark before using >> "write.csv"? >> >> Thanks a lot for any help! >> Vasyl >> >> >> >> >> >> On Tue, Jan 23, 2018 at 2:46 PM, Toy <noppani...@gmail.com >> <mailto:noppani...@gmail.com>> wrote: >> Thanks, I get this error when I switched to s3a:// >> >> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: >> com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287) >> at >> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669) >> >> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <palw...@hortonworks.com >> <mailto:palw...@hortonworks.com>> wrote: >> Spark cannot read locally from S3 without an S3a protocol; you’ll more than >> likely need a local copy of the data or you’ll need to utilize the proper >> jars to enable S3 communication from the edge to the datacenter. >> >> >> >> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark >> >> <https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark> >> >> >> Here are the jars: >> https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws >> <https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws> >> >> >> Looks like you already have them, in which case you’ll have to make small >> configuration changes, e.g. s3 à s3a >> >> >> >> Keep in mind: The Amazon JARs have proven very brittle: the version of the >> Amazon libraries must match the versions against which the Hadoop binaries >> were built. >> >> >> >> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client >> >> <https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client> >> >> >> >> >> >> >> >> >> From: Toy <noppani...@gmail.com <mailto:noppani...@gmail.com>> >> Date: Tuesday, January 23, 2018 at 11:33 AM >> To: "user@spark.apache.org <mailto:user@spark.apache.org>" >> <user@spark.apache.org <mailto:user@spark.apache.org>> >> Subject: I can't save DataFrame from running Spark locally >> >> >> >> Hi, >> >> >> >> First of all, my Spark application runs fine in AWS EMR. However, I'm trying >> to run it locally to debug some issue. My application is just to parse log >> files and convert to DataFrame then convert to ORC and save to S3. However, >> when I run locally I get this error >> >> >> >> java.io.IOException: /orc/dt=2018-01-23 doesn't exist >> >> at >> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170) >> >> at >> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:497) >> >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) >> >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >> >> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source) >> >> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340) >> >> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) >> >> at >> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77) >> >> at >> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) >> >> at >> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) >> >> at >> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) >> >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) >> >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) >> >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) >> >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> >> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) >> >> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) >> >> at >> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) >> >> at >> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) >> >> at >> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492) >> >> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) >> >> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) >> >> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193) >> >> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170) >> >> >> >> Here's what I have in sbt >> >> >> >> scalaVersion := "2.11.8" >> >> >> >> val sparkVersion = "2.1.0" >> >> val hadoopVersion = "2.7.3" >> >> val awsVersion = "1.11.155" >> >> >> >> lazy val sparkAndDependencies = Seq( >> >> "org.apache.spark" %% "spark-core" % sparkVersion, >> >> "org.apache.spark" %% "spark-sql" % sparkVersion, >> >> "org.apache.spark" %% "spark-hive" % sparkVersion, >> >> "org.apache.spark" %% "spark-streaming" % sparkVersion, >> >> >> >> "org.apache.hadoop" % "hadoop-aws" % hadoopVersion, >> >> "org.apache.hadoop" % "hadoop-common" % hadoopVersion >> >> ) >> >> >> >> And this is where the code failed >> >> >> >> val sparrowWriter = >> sparrowCastedDf.write.mode("append").format("orc").option("compression", >> "zlib") >> >> sparrowWriter.save(sparrowOutputPath) >> >> >> >> sparrowOutputPath is something like s3://bucket/folder and it exists I >> checked it with aws command line >> >> >> >> I put a breakpoint there and the full path looks like this >> s3://bucket/orc/dt=2018-01-23 which exists. >> >> >> >> I have also set up the credentials like this >> >> >> >> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key") >> >> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret") >> >> >> >> My confusion is this code runs fine in the cluster but I get this error >> running locally. >> >> >> >> >> >> >