Are you writing from an Amazon instance or from a on premise install to S3?
How many partitions are you writing from? Maybe you can try to “play” with 
repartitioning to see how it behaves?

> On Jan 23, 2018, at 17:09, Vasyl Harasymiv <vasyl.harasy...@gmail.com> wrote:
> 
> It is about 400 million rows. S3 automatically chunks the file on their end 
> while writing, so that's fine, e.g. creates the same file name with 
> alphanumeric suffixes. 
> However, the write session expires due to token expiration. 
> 
> On Tue, Jan 23, 2018 at 5:03 PM, Jörn Franke <jornfra...@gmail.com 
> <mailto:jornfra...@gmail.com>> wrote:
>  How large is the file?
> 
> If it is very large then you should have anyway several partitions for the 
> output. This is also important in case you need to read again from S3 - 
> having several files there enables parallel reading.
> 
> On 23. Jan 2018, at 23:58, Vasyl Harasymiv <vasyl.harasy...@gmail.com 
> <mailto:vasyl.harasy...@gmail.com>> wrote:
> 
>> Hi Spark Community,
>> 
>> Saving a data frame into a file on S3 using:
>> 
>> df.write.csv(s3_location)
>> 
>> If run for longer than 30 mins, the following error persists:
>> 
>> The provided token has expired. (Service: Amazon S3; Status Code: 400; Error 
>> Code: ExpiredToken;`)
>> 
>> Potentially, because there is a hardcoded session limit in temporary S3 
>> connection from Spark.
>> 
>> One can specify the duration as per here:
>> 
>> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html
>>  
>> <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html>
>> 
>> One can, of course, chunk data into sub-30 min writes. However, Is there a 
>> way to change the token expiry parameter directly in Spark before using 
>> "write.csv"?
>> 
>> Thanks a lot for any help!
>> Vasyl
>> 
>> 
>> 
>> 
>> 
>> On Tue, Jan 23, 2018 at 2:46 PM, Toy <noppani...@gmail.com 
>> <mailto:noppani...@gmail.com>> wrote:
>> Thanks, I get this error when I switched to s3a://
>> 
>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: 
>> com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
>>      at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
>>      at 
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>> 
>> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <palw...@hortonworks.com 
>> <mailto:palw...@hortonworks.com>> wrote:
>> Spark cannot read locally from S3 without an S3a protocol; you’ll more than 
>> likely need a local copy of the data or you’ll need to utilize the proper 
>> jars to enable S3 communication from the edge to the datacenter.
>> 
>>  
>> 
>> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark
>>  
>> <https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark>
>>  
>> 
>> Here are the jars: 
>> https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws 
>> <https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws>
>>  
>> 
>> Looks like you already have them, in which case you’ll have to make small 
>> configuration changes, e.g. s3 à s3a
>> 
>>  
>> 
>> Keep in mind: The Amazon JARs have proven very brittle: the version of the 
>> Amazon libraries must match the versions against which the Hadoop binaries 
>> were built.
>> 
>>  
>> 
>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client
>>  
>> <https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client>
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>> From: Toy <noppani...@gmail.com <mailto:noppani...@gmail.com>>
>> Date: Tuesday, January 23, 2018 at 11:33 AM
>> To: "user@spark.apache.org <mailto:user@spark.apache.org>" 
>> <user@spark.apache.org <mailto:user@spark.apache.org>>
>> Subject: I can't save DataFrame from running Spark locally
>> 
>>  
>> 
>> Hi,
>> 
>>  
>> 
>> First of all, my Spark application runs fine in AWS EMR. However, I'm trying 
>> to run it locally to debug some issue. My application is just to parse log 
>> files and convert to DataFrame then convert to ORC and save to S3. However, 
>> when I run locally I get this error
>> 
>>  
>> 
>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>> 
>> at 
>> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
>> 
>> at 
>> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
>> 
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> 
>> at 
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>> 
>> at 
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>> 
>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>> 
>> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
>> 
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>> 
>> at 
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>> 
>> at 
>> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>> 
>> at 
>> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>> 
>> at 
>> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>> 
>> at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>> 
>> at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>> 
>> at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>> 
>> at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 
>> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>> 
>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>> 
>> at 
>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>> 
>> at 
>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>> 
>> at 
>> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>> 
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>> 
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>> 
>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>> 
>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>> 
>>  
>> 
>> Here's what I have in sbt
>> 
>>  
>> 
>> scalaVersion := "2.11.8"
>> 
>>  
>> 
>> val sparkVersion = "2.1.0"
>> 
>> val hadoopVersion = "2.7.3"
>> 
>> val awsVersion = "1.11.155"
>> 
>>  
>> 
>> lazy val sparkAndDependencies = Seq(
>> 
>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>> 
>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>> 
>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>> 
>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>> 
>>  
>> 
>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>> 
>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>> 
>> )
>> 
>>  
>> 
>> And this is where the code failed
>> 
>>  
>> 
>> val sparrowWriter = 
>> sparrowCastedDf.write.mode("append").format("orc").option("compression", 
>> "zlib")
>> 
>> sparrowWriter.save(sparrowOutputPath)
>> 
>>  
>> 
>> sparrowOutputPath is something like s3://bucket/folder and it exists I 
>> checked it with aws command line
>> 
>>  
>> 
>> I put a breakpoint there and the full path looks like this 
>> s3://bucket/orc/dt=2018-01-23 which exists.
>> 
>>  
>> 
>> I have also set up the credentials like this
>> 
>>  
>> 
>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>> 
>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>> 
>>  
>> 
>> My confusion is this code runs fine in the cluster but I get this error 
>> running locally.
>> 
>>  
>> 
>>  
>> 
>> 
> 

Reply via email to