sqlContext.registerDataFrameAsTable is not working properly in pyspark 2.0

2016-09-14 Thread sririshindra
Hi,

I have a production job that is registering four different dataframes as
tables in pyspark 1.6.2 . when we upgraded to spark 2.0 only three of the
four dataframes are getting registered. the fourth dataframe is not getting
registered. There are no code changes whatsoever. The only change is the
spark verion. When I revert the spark version to 1.6.2 the dataframe is
getting registered properly.  Did anyone face a similar issue? Is this a bug
in spark 2.0 or is it just a compatibility issue?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/sqlContext-registerDataFrameAsTable-is-not-working-properly-in-pyspark-2-0-tp18938.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Output Committers for S3

2017-03-27 Thread sririshindra
Hi 

I have a job which saves a dataframe as parquet file to s3.

The built a jar using your repository https://github.com/rdblue/s3committer.

I added the following config in the to the Spark Session 
config("spark.hadoop.spark.sql.parquet.output.committer.class",
"com.netflix.bdp.s3.S3PartitionedOutputCommitter")


I submitted the job to spark 2.0.2 as follows 

./bin/spark-submit --master local[*] --driver-memory 4G --jars
/home/rishi/Downloads/hadoop-aws-2.7.3.jar,/home/rishi/Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/s3committer/build/libs/s3committer-0.5.5.jar
--driver-library-path
/home/user/Downloads/hadoop-aws-2.7.3.jar,/home/user/Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/s3committer/build/libs/s3committer-0.5.5.jar
 
--class main.streaming.scala.backupdatatos3.backupdatatos3Processorr
--packages
joda-time:joda-time:2.9.7,org.mongodb.mongo-hadoop:mongo-hadoop-core:1.5.2,org.mongodb:mongo-java-driver:3.3.0
/home/user/projects/backupjob/target/Backup-1.0-SNAPSHOT.jar


I am gettig the following runtime exception.
xception in thread "main" java.lang.RuntimeException:
java.lang.RuntimeException: class
com.netflix.bdp.s3.S3PartitionedOutputCommitter not
org.apache.parquet.hadoop.ParquetOutputCommitter
at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.prepareWrite(ParquetFileFormat.scala:81)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:108)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
at
main.streaming.scala.backupdatatos3.backupdatatos3Processorr$.main(backupdatatos3Processorr.scala:229)
at
main.streaming.scala.backupdatatos3.backupdatatos3Processorr.main(backupdatatos3Processorr.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: class
com.netflix.bdp.s3.S3PartitionedOutputCommitter not
org.apache.parquet.hadoop.ParquetOutputCommitter
at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2221)
... 28 more

can you please point out my mistake.

If possible can you give a working example of saving a dataframe as a
parquet file in s3.







--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21246.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Output Committers for S3

2017-06-16 Thread sririshindra
Hi Ryan and Steve,

Thanks very much for your reply.

I was finally able to get Ryan's repo work for me by changing the output
committer to FileOutputFormat instead of ParquetOutputCommitter in spark as
Steve suggested. 

However, It is not working for append mode while saving the data frame. 

val hf =
spark.read.parquet("/home/user/softwares/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/users.parquet")

hf.persist(StorageLevel.DISK_ONLY)
hf.show()
hf.write
  .partitionBy("name").mode("append")
  .save(S3Location + "data" + ".parquet")



The above code is successfully saving the parquet file when I am running it
for the first time. But When I rerun the code again the new parquet files
are not getting added to s3

I have put a print statement in the constructors of
PartitionedOutputCommiter in Ryan's repo and realized that the partitioned
output committer is not even getting called the second time I ran the code.
It is being called only for the first time. Is there anything that I can do
to make spark call the PartitionedOutputCommiter even when the file already
exists in s3?






--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21776.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Output Committers for S3

2017-06-17 Thread sririshindra
Hi,

as @Venkata krishnan pointed out spark does not allow DFOC when append mode
is enabled.

in the following class in spark, there is a small check

org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol


if (isAppend) {
  // If we are appending data to an existing dir, we will only use the
output committer
  // associated with the file output format since it is not safe to use
a custom
  // committer for appending. For example, in S3, direct parquet output
committer may
  // leave partial data in the destination dir when the appending job
fails.
  // See SPARK-8578 for more details.

However, the reasoning mentioned in the above comments is probably (maybe
ryan or steve can confirm this assumption) not applicable to the Netflix
commiter uploaded by Ryan blue. Because Ryan's commiter uses multipart
upload. So either the whole file is live or nothing is. partial data will
not be available for read. Whatever partial data that might have been
uploaded to s3 by a failed job will be removed after 1 day (I think this the
default in ryan's code. This can be modified using the following config
(fs.s3a.multipart.purge.age -- 86400))


So I simply changed the code to 
 if (true) {

and rebuilt spark from scratch. everything is working well for me in my
initial tests.


There is one more problem I wanted to mention. For some reason, I am getting
an authentication issue while using ryan's code. I made the following change
inside ryan's code.

I changed the findClinet method in S3MultiPartOutputCommiter.java (Ryan's
repo) to the following

  protected Object findClient(Path path, Configuration conf) {
  System.out.println("findinClinet in S3MultipartOutPutCommiter");
  //AWSCredentials
  //AmazonS3Client cli = new AmazonS3Client(new
ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
  AmazonS3Client cli = new AmazonS3Client(new
com.amazonaws.auth.EnvironmentVariableCredentialsProvider()); //new
AmazonS3Client();
  System.out.println(cli);
  return cli;
//return new AmazonS3Client(new
ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
  }


We just have to set the s3 credentials in the ~/.bashrc file.

Please add anything that I might have missed.

Also please look at ryan's talk at spark summit a few days ago
( Imporoving Apache spark with s3 by ryan blue
  )
















--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21779.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Output Committers for S3

2017-06-19 Thread sririshindra
Is there anything similar to s3 connector for Google cloud storage?
Since Google cloud Storage is also an object store rather than a file
system, I imagine the same problem that the s3 connector is trying to solve
arises with google cloud storage as well.

Thanks,
rishi



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033p21803.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org