Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Hello,

   Spark sql is generating query plan with all partitions information even
though if we apply filters on partitions in the query.  Due to this,
sparkdriver/hive metastore is
hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to *fetch all
partitions* from
hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
(HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
cmd=get_partitions : db= tbl=x


Configured the following parameters in the spark conf to fix the above
issue(source: from spark-jira & github pullreq):

*spark.sql.hive.convertMetastoreParquet   false*
*spark.sql.hive.metastorePartitionPruning   true*


*   plan:  rdf.explain*
*   == Physical Plan ==*
   HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
28),(hour#317 = 2),(venture#318 = DEFAULT)]

*get_partitions_by_filter* method is called and fetching only required
partitions.

But we are seeing parquetDecode errors in our applications frequently
after this. Looks like these decoding errors were because of changing
serde fromspark-builtin to hive serde.

I feel like,* fixing query plan generation in the spark-sql* is the right
approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more
thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti  wrote:

> Had a high level look into the code. Seems getHiveQlPartitions  method
> from HiveMetastoreCatalog is getting called irrespective of 
> metastorePartitionPruning
> conf value.
>
>  It should not fetch all partitions if we set metastorePartitionPruning to
> true (Default value for this is false)
>
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
> table.getPartitions(predicates)
>   } else {
> allPartitions
>   }
>
> ...
>
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
>
> lazy val allPartitions = table.getAllPartitions
>
> But somehow getAllPartitions is getting called eventough after setting 
> metastorePartitionPruning to true.
>
> Am I missing something or looking at wrong place?
>
>
> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti 
> wrote:
>
>> Waiting for suggestions/help on this...
>>
>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti 
>> wrote:
>>
>>> Hello,
>>>
>>>Spark sql is generating query plan with all partitions information
>>> even though if we apply filters on partitions in the query.  Due to this,
>>> spark driver/hive metastore is hitting with OOM as each table is with lots
>>> of partitions.
>>>
>>> We can confirm from hive audit logs that it tries to *fetch all
>>> partitions* from hive metastore.
>>>
>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
>>> cmd=get_partitions : db= tbl=x
>>>
>>>
>>> Configured the following parameters in the spark conf to fix the above
>>> issue(source: from spark-jira & github pullreq):
>>>
>>> *spark.sql.hive.convertMetastoreParquet   false*
>>> *spark.sql.hive.metastorePartitionPruning   true*
>>>
>>>
>>> *   plan:  rdf.explain*
>>> *   == Physical Plan ==*
>>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
>>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>>
>>> *get_partitions_by_filter* method is called and fetching only
>>> required partitions.
>>>
>>> But we are seeing parquetDecode errors in our applications
>>> frequently after this. Looks like these decoding errors were because of
>>> changing serde from spark-builtin to hive serde.
>>>
>>> I feel like,* fixing query plan generation in the spark-sql* is the
>>> right approach instead of forcing users to use hive serde.
>>>
>>> Is there any workaround/way to fix this issue? I would like to hear more
>>> thoughts on this :)
>>>
>>> --
>>> Thanks,
>>> Raju Bairishetti,
>>> www.lazada.com
>>>
>>
>>
>>
>> --
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>
>
> --
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>



-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Had a high level look into the code. Seems getHiveQlPartitions  method from
HiveMetastoreCatalog is getting called irrespective of
metastorePartitionPruning
conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to
true (Default value for this is false)

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
  val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
  } else {
allPartitions
  }

...

def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
  client.getPartitionsByFilter(this, predicates)

lazy val allPartitions = table.getAllPartitions

But somehow getAllPartitions is getting called eventough after setting
metastorePartitionPruning to true.

Am I missing something or looking at wrong place?


On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti  wrote:

> Hello,
>
>Spark sql is generating query plan with all partitions information
> even though if we apply filters on partitions in the query.  Due to this,
> sparkdriver/hive metastore is hitting with OOM as each table is with lots
> of partitions.
>
> We can confirm from hive audit logs that it tries to
> *fetch all partitions* from hive metastore.
>
>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
> cmd=get_partitions : db= tbl=x
>
>
> Configured the following parameters in the spark conf to fix the above
> issue(source: from spark-jira & github pullreq):
>
> *spark.sql.hive.convertMetastoreParquet   false*
> *spark.sql.hive.metastorePartitionPruning   true*
>
>
> *   plan:  rdf.explain*
> *   == Physical Plan ==*
>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>
> *get_partitions_by_filter* method is called and fetching only
> required partitions.
>
> But we are seeing parquetDecode errors in our applications frequently
> after this. Looks like these decoding errors were because of changing
> serde fromspark-builtin to hive serde.
>
> I feel like,* fixing query plan generation in the spark-sql* is the right
> approach instead of forcing users to use hive serde.
>
> Is there any workaround/way to fix this issue? I would like to hear more
> thoughts on this :)
>
>
> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti  wrote:
>
>> Had a high level look into the code. Seems getHiveQlPartitions  method
>> from HiveMetastoreCatalog is getting called irrespective of 
>> metastorePartitionPruning
>> conf value.
>>
>>  It should not fetch all partitions if we set metastorePartitionPruning to
>> true (Default value for this is false)
>>
>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = 
>> {
>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>> table.getPartitions(predicates)
>>   } else {
>> allPartitions
>>   }
>>
>> ...
>>
>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>   client.getPartitionsByFilter(this, predicates)
>>
>> lazy val allPartitions = table.getAllPartitions
>>
>> But somehow getAllPartitions is getting called eventough after setting 
>> metastorePartitionPruning to true.
>>
>> Am I missing something or looking at wrong place?
>>
>>
>> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti 
>> wrote:
>>
>>> Waiting for suggestions/help on this...
>>>
>>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti 
>>> wrote:
>>>
 Hello,

Spark sql is generating query plan with all partitions information
 even though if we apply filters on partitions in the query.  Due to this,
 spark driver/hive metastore is hitting with OOM as each table is with lots
 of partitions.

 We can confirm from hive audit logs that it tries to *fetch all
 partitions* from hive metastore.

  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
 (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
 cmd=get_partitions : db= tbl=x


 Configured the following parameters in the spark conf to fix the above
 issue(source: from spark-jira & github pullreq):

 *spark.sql.hive.convertMetastoreParquet   false*
 *spark.sql.hive.metastorePartitionPruning   true*


 *   plan:  rdf.explain*
 *   == Physical Plan ==*
HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
 tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
 28),(hour#317 = 2),(venture#318 = DEFAULT)]

 *get_partitions_by_filter* method is called and fetching only
 required partitions.

 But we are seeing parquetDecode errors in our applications
 frequently after this. Looks like these decoding errors were because of
 changing serde from spark-builtin

Re: Both Spark AM and Client are trying to delete Staging Directory

2017-01-17 Thread Steve Loughran
I think Rostyslav is using a DFS which logs at warn/error if you try to delete 
a directory that isn't there, so is seeing warning messages that nobody else 
does

Rostyslav —like I said, i'd be curious as to which DFS/object store you are 
working with, as it is behaving slightly differently from everyone else's.

It sounds like I may need to have a quick chat with them about the merits of 
running all the Hadoop FS specification tests, if they aren't already. 
Something like a warning printed on delete(nonexistent path) hints that there 
may be other differences, which makes me worry about what their rename() does. 
That's the thing that atomic and speculative work depends on, so everyone needs 
to understand what's expected there.



On 14 Jan 2017, at 19:42, Marcelo Vanzin 
mailto:van...@cloudera.com>> wrote:

scala> org.apache.hadoop.fs.FileSystem.getLocal(sc.hadoopConfiguration)
res0: org.apache.hadoop.fs.LocalFileSystem =
org.apache.hadoop.fs.LocalFileSystem@3f84970b

scala> res0.delete(new org.apache.hadoop.fs.Path("/tmp/does-not-exist"), true)
res3: Boolean = false

Does that explain your confusion?


On Sat, Jan 14, 2017 at 11:37 AM, Marcelo Vanzin 
mailto:van...@cloudera.com>> wrote:
Are you actually seeing a problem or just questioning the code?

I have never seen a situation where there's a failure because of that
part of the current code.

On Fri, Jan 13, 2017 at 3:24 AM, Rostyslav Sotnychenko
mailto:r.sotnyche...@gmail.com>> wrote:
Hi all!

I am a bit confused why Spark AM and Client are both trying to delete
Staging Directory.

https://github.com/apache/spark/blob/branch-2.1/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1110
https://github.com/apache/spark/blob/branch-2.1/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L233

As you can see, in case if a job was running on YARN in Cluster deployment
mode, both AM and Client will try to delete Staging directory if job
succeeded and eventually one of them will fail to do this, because the other
one already deleted the directory.

Shouldn't we add some check to Client?


Thanks,
Rostyslav



--
Marcelo



--
Marcelo

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org



GraphX-related "open" issues

2017-01-17 Thread Takeshi Yamamuro
Hi, devs

Sorry to bother you, but plz let me check in advance;
in JIRA, there are some open (and inactive) issues about GraphX features.
IIUC the current GraphX features become almost freeze and
they possibly get no modification except for critical bugs.
So, IMO it seems okay to close tickets about "Improvement" and "New
Feature" for now.
Thought?

If any problem, please let me know; otherwise, I'll close the tickets below
this weekend;
---
New Feature SPARK-15880
 - PREGEL Based Semi-Clustering Algorithm Implementation using Spark GraphX
API
New Feature SPARK-13733
 - Support initial weight distribution in personalized PageRank
Improvement SPARK-13460
 - Applying Encoding methods to GraphX's Internal storage structure
New Feature SPARK-10758
 - approximation algorithms to speedup triangle count and clustering
coefficient computation in GraphX
Improvement SPARK-10335
 - GraphX Connected Components fail with large number of iterations
New Feature SPARK-8497
 - Graph Clique(Complete Connected Sub-graph) Discovery Algorithm
New Feature SPARK-7258
 - spark.ml API taking Graph instead of DataFrame
New Feature SPARK-7257
 - Find nearest neighbor satisfying predicate
New Feature SPARK-7244
 - Find vertex sequences satisfying predicates
New Feature SPARK-4763
 - All-pairs shortest paths algorithm
Improvement SPARK-3373
 - Filtering operations should optionally rebuild routing tables


-- 
---
Takeshi Yamamuro (maropu)


Re: Weird experience Hive with Spark Transformations

2017-01-17 Thread Dongjoon Hyun
Hi, Chetan.

Did you copy your `hive-site.xml` into Spark conf directory? For example,

cp /usr/local/hive/conf/hive-site.xml /usr/local/spark/conf

If you want to use the existing Hive metastore, you need to provide that 
information to Spark.

Bests,
Dongjoon.

On 2017-01-16 21:36 (-0800), Chetan Khatri  wrote: 
> Hello,
> 
> I have following services are configured and installed successfully:
> 
> Hadoop 2.7.x
> Spark 2.0.x
> HBase 1.2.4
> Hive 1.2.1
> 
> *Installation Directories:*
> 
> /usr/local/hadoop
> /usr/local/spark
> /usr/local/hbase
> 
> *Hive Environment variables:*
> 
> #HIVE VARIABLES START
> export HIVE_HOME=/usr/local/hive
> export PATH=$PATH:$HIVE_HOME/bin
> #HIVE VARIABLES END
> 
> So, I can access Hive from anywhere as environment variables are
> configured. Now if if i start my spark-shell & hive from location
> /usr/local/hive then both work good for hive-metastore other wise from
> where i start spark-shell where spark creates own meta-store.
> 
> i.e I am reading from HBase and Writing to Hive using Spark. I dont know
> why this is weird issue is.
> 
> 
> 
> 
> Thanks.
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



spark main thread quit, but the Jvm of driver don't crash

2017-01-17 Thread John Fang
My spark main thread create some daemon thread. Then the spark application 
throw some exceptions, and the main thread will quit. But the jvm of driver 
don't crash, so How can i do?
for example:
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))

//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
  new ThreadFactory() {
def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
  })

scheduledExecutorService.scheduleAtFixedRate(
  new Runnable() {
def run() {
  try {
System.out.println("runable")
  } catch {
case e: Exception => {
  System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
}
  }
}
  }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)

Thread.sleep(1005)


val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 
10)
wordCounts.foreachRDD{rdd =>
  rdd.collect().foreach(println)
  throw new RuntimeException
}
ssc.start()
try {
  ssc.awaitTermination()
} catch {
  case e: Exception => {
System.out.println("end!")
throw e
  }
}



Re: GraphX-related "open" issues

2017-01-17 Thread Dongjoon Hyun
Hi, Takeshi.

> So, IMO it seems okay to close tickets about "Improvement" and "New Feature" 
> for now.

I'm just wondering about what kind of field value you want to fill in the 
`Resolution` field for those issues.

Maybe, 'Later'? Or, 'Won't Fix'?

Bests,
Dongjoon.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



spark main thread quit, but the driver don't crash at standalone cluster

2017-01-17 Thread John Fang
My spark main thread create some daemon threads which maybe timer thread. Then 
the spark application throw some exceptions, and the main thread will quit. But 
the jvm of driver don't crash for standalone cluster. Of course the question 
don't happen at yarn cluster. Because the application master will monitor the 
main thread of applicaiton, but the stanalone cluster can't. for example:val 
sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))

//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
  new ThreadFactory() {
def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
  })

scheduledExecutorService.scheduleAtFixedRate(
  new Runnable() {
def run() {
  try {
System.out.println("runable")
  } catch {
case e: Exception => {
  System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
}
  }
}
  }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)

Thread.sleep(1005)


val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 
10)
wordCounts.foreachRDD{rdd =>
  rdd.collect().foreach(println)
  throw new RuntimeException  //exception
}
ssc.start()
try {
  ssc.awaitTermination()
} catch {
  case e: Exception => {
System.out.println("end!")
throw e
  }
}




Re: GraphX-related "open" issues

2017-01-17 Thread Takeshi Yamamuro
Thank for your comment!
I'm just thinking I'll set "Won't Fix" though, "Later" is also okay.
But, I re-checked "Contributing to JIRA Maintenance" in the contribution
guide (http://spark.apache.org/contributing.html) and
I couldn't find any setting policy about "Later".
So, IMO it's okay to set "Won't Fix" for now and those who'd like to make
prs feel free to (re?-)open tickets.


On Wed, Jan 18, 2017 at 1:48 AM, Dongjoon Hyun  wrote:

> Hi, Takeshi.
>
> > So, IMO it seems okay to close tickets about "Improvement" and "New
> Feature" for now.
>
> I'm just wondering about what kind of field value you want to fill in the
> `Resolution` field for those issues.
>
> Maybe, 'Later'? Or, 'Won't Fix'?
>
> Bests,
> Dongjoon.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: GraphX-related "open" issues

2017-01-17 Thread Sean Owen
WontFix or Later is fine. There's not really any practical distinction. I
figure that if something times out and is closed, it's very unlikely to be
looked at again. Therefore marking it as something to do 'later' seemed
less accurate.

On Tue, Jan 17, 2017 at 5:30 PM Takeshi Yamamuro 
wrote:

> Thank for your comment!
> I'm just thinking I'll set "Won't Fix" though, "Later" is also okay.
> But, I re-checked "Contributing to JIRA Maintenance" in the contribution
> guide (http://spark.apache.org/contributing.html) and
> I couldn't find any setting policy about "Later".
> So, IMO it's okay to set "Won't Fix" for now and those who'd like to make
> prs feel free to (re?-)open tickets.
>
>
> On Wed, Jan 18, 2017 at 1:48 AM, Dongjoon Hyun 
> wrote:
>
> Hi, Takeshi.
>
> > So, IMO it seems okay to close tickets about "Improvement" and "New
> Feature" for now.
>
> I'm just wondering about what kind of field value you want to fill in the
> `Resolution` field for those issues.
>
> Maybe, 'Later'? Or, 'Won't Fix'?
>
> Bests,
> Dongjoon.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Both Spark AM and Client are trying to delete Staging Directory

2017-01-17 Thread Rostyslav Sotnychenko
> I think Rostyslav is using a DFS which logs at warn/error if you try to
delete a directory that isn't there, so is seeing warning messages that
nobody else does

Yep, you are correct.

> Rostyslav —like I said, i'd be curious as to which DFS/object store you
are working with

Unfortunately, I am not able to share this information :(

> It sounds like I may need to have a quick chat with them about the merits
of running all the Hadoop FS specification tests

I already send the link to Hadoop docs you mentioned to appropriate people,
so the difference could be fixed.


Thanks,
Rostyslav


On Tue, Jan 17, 2017 at 1:58 PM, Steve Loughran 
wrote:

> I think Rostyslav is using a DFS which logs at warn/error if you try to
> delete a directory that isn't there, so is seeing warning messages that
> nobody else does
>
> Rostyslav —like I said, i'd be curious as to which DFS/object store you
> are working with, as it is behaving slightly differently from everyone
> else's.
>
> It sounds like I may need to have a quick chat with them about the merits
> of running all the Hadoop FS specification tests, if they aren't already.
> Something like a warning printed on delete(nonexistent path) hints that
> there may be other differences, which makes me worry about what their
> rename() does. That's the thing that atomic and speculative work depends
> on, so everyone needs to understand what's expected there.
>
>
>
> On 14 Jan 2017, at 19:42, Marcelo Vanzin  wrote:
>
> scala> org.apache.hadoop.fs.FileSystem.getLocal(sc.hadoopConfiguration)
> res0: org.apache.hadoop.fs.LocalFileSystem =
> org.apache.hadoop.fs.LocalFileSystem@3f84970b
>
> scala> res0.delete(new org.apache.hadoop.fs.Path("/tmp/does-not-exist"),
> true)
> res3: Boolean = false
>
> Does that explain your confusion?
>
>
> On Sat, Jan 14, 2017 at 11:37 AM, Marcelo Vanzin 
> wrote:
>
> Are you actually seeing a problem or just questioning the code?
>
> I have never seen a situation where there's a failure because of that
> part of the current code.
>
> On Fri, Jan 13, 2017 at 3:24 AM, Rostyslav Sotnychenko
>  wrote:
>
> Hi all!
>
> I am a bit confused why Spark AM and Client are both trying to delete
> Staging Directory.
>
> https://github.com/apache/spark/blob/branch-2.1/yarn/
> src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1110
> https://github.com/apache/spark/blob/branch-2.1/yarn/
> src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L233
>
> As you can see, in case if a job was running on YARN in Cluster deployment
> mode, both AM and Client will try to delete Staging directory if job
> succeeded and eventually one of them will fail to do this, because the
> other
> one already deleted the directory.
>
> Shouldn't we add some check to Client?
>
>
> Thanks,
> Rostyslav
>
>
>
>
> --
> Marcelo
>
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>
>


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and 
will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
Can you link specifically to the jira issue or spark pr you referred to? The 
first thing I would try is setting spark.sql.hive.convertMetastoreParquet to 
true. Setting that to false might also explain why you're getting parquet 
decode errors. If you're writing your table data with Spark's parquet file 
writer and reading with Hive's parquet file reader, there may be an 
incompatibility accounting for the decode errors you're seeing. 

Can you reply with your table's Hive metastore schema, including partition 
schema? Where are the table's files located? If you do a "show partitions 
." in the spark-sql shell, does it show the partitions you 
expect to see? If not, run "msck repair table .".

Cheers,

Michael


> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti  wrote:
> 
> Had a high level look into the code. Seems getHiveQlPartitions  method from 
> HiveMetastoreCatalog is getting called irrespective of 
> metastorePartitionPruning conf value.
> 
>  It should not fetch all partitions if we set metastorePartitionPruning to 
> true (Default value for this is false) 
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
> table.getPartitions(predicates)
>   } else {
> allPartitions
>   }
> ...
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
> lazy val allPartitions = table.getAllPartitions
> But somehow getAllPartitions is getting called eventough after setting 
> metastorePartitionPruning to true.
> Am I missing something or looking at wrong place?
> 
> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti  > wrote:
> Hello,
>   
>Spark sql is generating query plan with all partitions information even 
> though if we apply filters on partitions in the query.  Due to this, 
> sparkdriver/hive metastore is hitting with OOM as each table is with lots of 
> partitions.
> 
> We can confirm from hive audit logs that it tries to fetch all partitions 
> from hive metastore.
> 
>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x   
> cmd=get_partitions : db= tbl=x
> 
> 
> Configured the following parameters in the spark conf to fix the above 
> issue(source: from spark-jira & github pullreq):
> spark.sql.hive.convertMetastoreParquet   false
> spark.sql.hive.metastorePartitionPruning   true
> 
>plan:  rdf.explain
>== Physical Plan ==
>HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 
> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
> 
> get_partitions_by_filter method is called and fetching only required 
> partitions.
> 
> But we are seeing parquetDecode errors in our applications frequently 
> after this. Looks like these decoding errors were because of changing serde 
> fromspark-builtin to hive serde.
> 
> I feel like, fixing query plan generation in the spark-sql is the right 
> approach instead of forcing users to use hive serde.
> 
> Is there any workaround/way to fix this issue? I would like to hear more 
> thoughts on this :)
> 
> 
> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti  > wrote:
> Had a high level look into the code. Seems getHiveQlPartitions  method from 
> HiveMetastoreCatalog is getting called irrespective of 
> metastorePartitionPruning conf value.
> 
>  It should not fetch all partitions if we set metastorePartitionPruning to 
> true (Default value for this is false) 
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
> table.getPartitions(predicates)
>   } else {
> allPartitions
>   }
> ...
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
> lazy val allPartitions = table.getAllPartitions
> But somehow getAllPartitions is getting called eventough after setting 
> metastorePartitionPruning to true.
> Am I missing something or looking at wrong place?
> 
> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti  > wrote:
> Waiting for suggestions/help on this... 
> 
> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti  > wrote:
> Hello,
>   
>Spark sql is generating query plan with all partitions information even 
> though if we apply filters on partitions in the query.  Due to this, spark 
> driver/hive metastore is hitting with OOM as each table is with lots of 
> partitions.
> 
> We can confirm from hive audit logs that it trie

Feedback on MLlib roadmap process proposal

2017-01-17 Thread Joseph Bradley
Hi all,

This is a general call for thoughts about the process for the MLlib roadmap
proposed in SPARK-18813.  See the section called "Roadmap process."

Summary:
* This process is about committers indicating intention to shepherd and
review.
* The goal is to improve visibility and communication.
* This is fairly orthogonal to the SIP discussion since this proposal is
more about setting release targets than about proposing future plans.

Thanks!
Joseph

-- 

Joseph Bradley

Software Engineer - Machine Learning

Databricks, Inc.

[image: http://databricks.com] 


Re: Limit Query Performance Suggestion

2017-01-17 Thread sujith71955
Dear Liang,

Thanks for your valuable feedback.

There was a mistake in the previous post  i corrected it, as you mentioned
the  `GlobalLimit` we will only take the required number of rows from the
input iterator which really pulls data from local blocks and remote blocks.
but if the limit value is very high >= 1000,  and when there will be a
shuffle exchange happens  between `GlobalLimit` and `LocalLimit` to retrieve
data from all partitions to one partition, since the limit value is very
large the performance bottleneck still exists.
 
soon in next  post i will publish a test report with sample data and also
figuring out a solution for this problem. 

Please let me know for any clarifications or suggestions regarding this
issue.

Regards,
Sujith



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20640.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman 
wrote:

> Hi Raju,
>
> I'm sorry this isn't working for you. I helped author this functionality
> and will try my best to help.
>
> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to
> false?
>
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did
not work for me without  setting *spark.sql.hive.convertMetastoreParquet*
property.

Can you link specifically to the jira issue or spark pr you referred to?
> The first thing I would try is setting spark.sql.hive.convertMetastoreParquet
> to true. Setting that to false might also explain why you're getting
> parquet decode errors. If you're writing your table data with Spark's
> parquet file writer and reading with Hive's parquet file reader, there may
> be an incompatibility accounting for the decode errors you're seeing.
>
>  https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is
to avoid fetching all the partitions. We reverted
spark.sql.hive.convertMetastoreParquet
 setting to true to decoding errors. After reverting this it is fetching
all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition
> schema?
>
 col1 string
 col2 string
 year int
 month int
 day int
 hour int

# Partition Information

# col_namedata_type   comment

year  int

month int

day int

hour int

venture string

>
>
Where are the table's files located?
>
In hadoop. Under some user directory.

> If you do a "show partitions ." in the spark-sql shell,
> does it show the partitions you expect to see? If not, run "msck repair
> table .".
>
Yes. It is listing the partitions

> Cheers,
>
> Michael
>
>
> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti  wrote:
>
> Had a high level look into the code. Seems getHiveQlPartitions  method
> from HiveMetastoreCatalog is getting called irrespective of 
> metastorePartitionPruning
> conf value.
>
>  It should not fetch all partitions if we set metastorePartitionPruning to
> true (Default value for this is false)
>
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
> table.getPartitions(predicates)
>   } else {
> allPartitions
>   }
>
> ...
>
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
>
> lazy val allPartitions = table.getAllPartitions
>
> But somehow getAllPartitions is getting called eventough after setting 
> metastorePartitionPruning to true.
>
> Am I missing something or looking at wrong place?
>
>
> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti  wrote:
>
>> Hello,
>>
>>Spark sql is generating query plan with all partitions information
>> even though if we apply filters on partitions in the query.  Due to
>> this, sparkdriver/hive metastore is hitting with OOM as each table is
>> with lots of partitions.
>>
>> We can confirm from hive audit logs that it tries to
>> *fetch all partitions* from hive metastore.
>>
>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
>> cmd=get_partitions : db= tbl=x
>>
>>
>> Configured the following parameters in the spark conf to fix the above
>> issue(source: from spark-jira & github pullreq):
>>
>> *spark.sql.hive.convertMetastoreParquet   false*
>> *spark.sql.hive.metastorePartitionPruning   true*
>>
>>
>> *   plan:  rdf.explain*
>> *   == Physical Plan ==*
>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>
>> *get_partitions_by_filter* method is called and fetching only
>> required partitions.
>>
>> But we are seeing parquetDecode errors in our applications frequently
>> after this. Looks like these decoding errors were because of changing
>> serde fromspark-builtin to hive serde.
>>
>> I feel like,* fixing query plan generation in the spark-sql* is the
>> right approach instead of forcing users to use hive serde.
>>
>> Is there any workaround/way to fix this issue? I would like to hear more
>> thoughts on this :)
>>
>>
>> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti 
>> wrote:
>>
>>> Had a high level look into the code. Seems getHiveQlPartitions  method
>>> from HiveMetastoreCatalog is getting called irrespective of 
>>> metastorePartitionPruning
>>> conf value.
>>>
>>>  It should not fetch all partitions if we set metastorePartitionPruning to
>>> true (Default value for this is false)
>>>
>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] 
>>> = {
>>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>>> table.getPartitions(predicates)
>>>   } else {
>>> allPartitions
>>>   }
>

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
What is the physical query plan after you set 
spark.sql.hive.convertMetastoreParquet to true?

Michael

> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti  wrote:
> 
> Thanks Michael for the respopnse.
> 
> 
> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman  > wrote:
> Hi Raju,
> 
> I'm sorry this isn't working for you. I helped author this functionality and 
> will try my best to help.
> 
> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to 
> false? 
> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not 
> work for me without  setting spark.sql.hive.convertMetastoreParquet property. 
> 
> Can you link specifically to the jira issue or spark pr you referred to? The 
> first thing I would try is setting spark.sql.hive.convertMetastoreParquet to 
> true. Setting that to false might also explain why you're getting parquet 
> decode errors. If you're writing your table data with Spark's parquet file 
> writer and reading with Hive's parquet file reader, there may be an 
> incompatibility accounting for the decode errors you're seeing. 
> 
>  https://issues.apache.org/jira/browse/SPARK-6910 
>  . My main motivation is to 
> avoid fetching all the partitions. We reverted 
> spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. 
> After reverting this it is fetching all partiitons from the table.
> 
> Can you reply with your table's Hive metastore schema, including partition 
> schema?
>  col1 string
>  col2 string
>  year int
>  month int
>  day int
>  hour int   
> # Partition Information
> 
> # col_namedata_type   comment
> 
> year  int
> 
> month int
> 
> day int
> 
> hour int
> 
> venture string
> 
>  
> Where are the table's files located?
> In hadoop. Under some user directory. 
> If you do a "show partitions ." in the spark-sql shell, 
> does it show the partitions you expect to see? If not, run "msck repair table 
> .".
> Yes. It is listing the partitions
> Cheers,
> 
> Michael
> 
> 
>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti > > wrote:
>> 
>> Had a high level look into the code. Seems getHiveQlPartitions  method from 
>> HiveMetastoreCatalog is getting called irrespective of 
>> metastorePartitionPruning conf value.
>> 
>>  It should not fetch all partitions if we set metastorePartitionPruning to 
>> true (Default value for this is false) 
>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = 
>> {
>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>> table.getPartitions(predicates)
>>   } else {
>> allPartitions
>>   }
>> ...
>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>   client.getPartitionsByFilter(this, predicates)
>> lazy val allPartitions = table.getAllPartitions
>> But somehow getAllPartitions is getting called eventough after setting 
>> metastorePartitionPruning to true.
>> Am I missing something or looking at wrong place?
>> 
>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti > > wrote:
>> Hello,
>>   
>>Spark sql is generating query plan with all partitions information even 
>> though if we apply filters on partitions in the query.  Due to this, 
>> sparkdriver/hive metastore is hitting with OOM as each table is with lots of 
>> partitions.
>> 
>> We can confirm from hive audit logs that it tries to fetch all partitions 
>> from hive metastore.
>> 
>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x   
>> cmd=get_partitions : db= tbl=x
>> 
>> 
>> Configured the following parameters in the spark conf to fix the above 
>> issue(source: from spark-jira & github pullreq):
>> spark.sql.hive.convertMetastoreParquet   false
>> spark.sql.hive.metastorePartitionPruning   true
>> 
>>plan:  rdf.explain
>>== Physical Plan ==
>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 
>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>> 
>> get_partitions_by_filter method is called and fetching only required 
>> partitions.
>> 
>> But we are seeing parquetDecode errors in our applications frequently 
>> after this. Looks like these decoding errors were because of changing serde 
>> fromspark-builtin to hive serde.
>> 
>> I feel like, fixing query plan generation in the spark-sql is the right 
>> approach instead of forcing users to use hive serde.
>> 
>> Is there any workaround/way to fix this issue? I would like to hear more 
>> thoughts on this :)
>> 
>> 
>> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti > > wrote:
>> Had a high level look into the code. Seems getHiveQlPartitions  method from 
>> HiveMetastoreCatalog is gett

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman 
wrote:

> What is the physical query plan after you set 
> spark.sql.hive.convertMetastoreParquet
> to true?
>
Physical plan continas all the partition locations

>
> Michael
>
> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti  wrote:
>
> Thanks Michael for the respopnse.
>
>
> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman 
> wrote:
>
>> Hi Raju,
>>
>> I'm sorry this isn't working for you. I helped author this functionality
>> and will try my best to help.
>>
>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to
>> false?
>>
> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did
> not work for me without  setting *spark.sql.hive.convertMetastoreParquet*
> property.
>
> Can you link specifically to the jira issue or spark pr you referred to?
>> The first thing I would try is setting spark.sql.hive.convertMetastoreParquet
>> to true. Setting that to false might also explain why you're getting
>> parquet decode errors. If you're writing your table data with Spark's
>> parquet file writer and reading with Hive's parquet file reader, there may
>> be an incompatibility accounting for the decode errors you're seeing.
>>
>>  https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation
> is to avoid fetching all the partitions. We reverted
> spark.sql.hive.convertMetastoreParquet  setting to true to decoding
> errors. After reverting this it is fetching all partiitons from the table.
>
> Can you reply with your table's Hive metastore schema, including partition
>> schema?
>>
>  col1 string
>  col2 string
>  year int
>  month int
>  day int
>  hour int
>
> # Partition Information
>
> # col_namedata_type   comment
>
> year  int
>
> month int
>
> day int
>
> hour int
>
> venture string
>
>>
>>
> Where are the table's files located?
>>
> In hadoop. Under some user directory.
>
>> If you do a "show partitions ." in the spark-sql
>> shell, does it show the partitions you expect to see? If not, run "msck
>> repair table .".
>>
> Yes. It is listing the partitions
>
>> Cheers,
>>
>> Michael
>>
>>
>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti  wrote:
>>
>> Had a high level look into the code. Seems getHiveQlPartitions  method
>> from HiveMetastoreCatalog is getting called irrespective of 
>> metastorePartitionPruning
>> conf value.
>>
>>  It should not fetch all partitions if we set metastorePartitionPruning to
>> true (Default value for this is false)
>>
>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = 
>> {
>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>> table.getPartitions(predicates)
>>   } else {
>> allPartitions
>>   }
>>
>> ...
>>
>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>   client.getPartitionsByFilter(this, predicates)
>>
>> lazy val allPartitions = table.getAllPartitions
>>
>> But somehow getAllPartitions is getting called eventough after setting 
>> metastorePartitionPruning to true.
>>
>> Am I missing something or looking at wrong place?
>>
>>
>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti 
>> wrote:
>>
>>> Hello,
>>>
>>>Spark sql is generating query plan with all partitions information
>>> even though if we apply filters on partitions in the query.  Due to
>>> this, sparkdriver/hive metastore is hitting with OOM as each table is
>>> with lots of partitions.
>>>
>>> We can confirm from hive audit logs that it tries to
>>> *fetch all partitions* from hive metastore.
>>>
>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
>>> cmd=get_partitions : db= tbl=x
>>>
>>>
>>> Configured the following parameters in the spark conf to fix the above
>>> issue(source: from spark-jira & github pullreq):
>>>
>>> *spark.sql.hive.convertMetastoreParquet   false*
>>> *spark.sql.hive.metastorePartitionPruning   true*
>>>
>>>
>>> *   plan:  rdf.explain*
>>> *   == Physical Plan ==*
>>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
>>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>>
>>> *get_partitions_by_filter* method is called and fetching only
>>> required partitions.
>>>
>>> But we are seeing parquetDecode errors in our applications
>>> frequently after this. Looks like these decoding errors were because of
>>> changing serde fromspark-builtin to hive serde.
>>>
>>> I feel like,* fixing query plan generation in the spark-sql* is the
>>> right approach instead of forcing users to use hive serde.
>>>
>>> Is there any workaround/way to fix this issue? I would like to hear more
>>> thoughts on this :)
>>>
>>>
>>> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti 
>>> wrote:
>>>
 Had a high level look into the code. Seems getHiveQlPartitions  method
 from HiveMetastor

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
Can you paste the actual query plan here, please?

> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti  wrote:
> 
> 
> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman  > wrote:
> What is the physical query plan after you set 
> spark.sql.hive.convertMetastoreParquet to true?
> Physical plan continas all the partition locations 
> 
> Michael
> 
>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti > > wrote:
>> 
>> Thanks Michael for the respopnse.
>> 
>> 
>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman > > wrote:
>> Hi Raju,
>> 
>> I'm sorry this isn't working for you. I helped author this functionality and 
>> will try my best to help.
>> 
>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to 
>> false? 
>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not 
>> work for me without  setting spark.sql.hive.convertMetastoreParquet 
>> property. 
>> 
>> Can you link specifically to the jira issue or spark pr you referred to? The 
>> first thing I would try is setting spark.sql.hive.convertMetastoreParquet to 
>> true. Setting that to false might also explain why you're getting parquet 
>> decode errors. If you're writing your table data with Spark's parquet file 
>> writer and reading with Hive's parquet file reader, there may be an 
>> incompatibility accounting for the decode errors you're seeing. 
>> 
>>  https://issues.apache.org/jira/browse/SPARK-6910 
>>  . My main motivation is 
>> to avoid fetching all the partitions. We reverted 
>> spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. 
>> After reverting this it is fetching all partiitons from the table.
>> 
>> Can you reply with your table's Hive metastore schema, including partition 
>> schema?
>>  col1 string
>>  col2 string
>>  year int
>>  month int
>>  day int
>>  hour int   
>> # Partition Information   
>> 
>> # col_namedata_type   comment
>> 
>> year  int
>> 
>> month int
>> 
>> day int
>> 
>> hour int
>> 
>> venture string
>> 
>>  
>> Where are the table's files located?
>> In hadoop. Under some user directory. 
>> If you do a "show partitions ." in the spark-sql shell, 
>> does it show the partitions you expect to see? If not, run "msck repair 
>> table .".
>> Yes. It is listing the partitions
>> Cheers,
>> 
>> Michael
>> 
>> 
>>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti >> > wrote:
>>> 
>>> Had a high level look into the code. Seems getHiveQlPartitions  method from 
>>> HiveMetastoreCatalog is getting called irrespective of 
>>> metastorePartitionPruning conf value.
>>> 
>>>  It should not fetch all partitions if we set metastorePartitionPruning to 
>>> true (Default value for this is false) 
>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] 
>>> = {
>>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>>> table.getPartitions(predicates)
>>>   } else {
>>> allPartitions
>>>   }
>>> ...
>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>>   client.getPartitionsByFilter(this, predicates)
>>> lazy val allPartitions = table.getAllPartitions
>>> But somehow getAllPartitions is getting called eventough after setting 
>>> metastorePartitionPruning to true.
>>> Am I missing something or looking at wrong place?
>>> 
>>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti >> > wrote:
>>> Hello,
>>>   
>>>Spark sql is generating query plan with all partitions information even 
>>> though if we apply filters on partitions in the query.  Due to this, 
>>> sparkdriver/hive metastore is hitting with OOM as each table is with lots 
>>> of partitions.
>>> 
>>> We can confirm from hive audit logs that it tries to fetch all partitions 
>>> from hive metastore.
>>> 
>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x   
>>> cmd=get_partitions : db= tbl=x
>>> 
>>> 
>>> Configured the following parameters in the spark conf to fix the above 
>>> issue(source: from spark-jira & github pullreq):
>>> spark.sql.hive.convertMetastoreParquet   false
>>> spark.sql.hive.metastorePartitionPruning   true
>>> 
>>>plan:  rdf.explain
>>>== Physical Plan ==
>>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
>>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 
>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>> 
>>> get_partitions_by_filter method is called and fetching only required 
>>> partitions.
>>> 
>>> But we are seeing parquetDecode errors in our applications frequently 
>>> after this. Looks like these decoding errors were because of changing serde 
>>> fromspark-builtin to hive serde.
>>> 
>>> I feel like, fixing

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
 describe dummy;

OK

sample  string

yearstring

month   string

# Partition Information

# col_namedata_type   comment

yearstring

month   string


val df = sqlContext.sql("select count(1) from rajub.dummy where year='*2017*
'")

df: org.apache.spark.sql.DataFrame = [_c0: bigint]


*scala> df.explain*

== Physical Plan ==

TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])

+- TungstenExchange SinglePartition, None

   +- TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])

  +- Scan ParquetRelation: rajub.dummy[] InputPaths:
maprfs:/user/rajub/dummy/sample/year=2016/month=10,
maprfs:/user/rajub/dummy/sample/year=*2016*/month=11,
maprfs:/user/rajub/dummy/sample/year=*2016*/month=9,
maprfs:/user/rajub/dummy/sample/year=2017/month=10,
maprfs:/user/rajub/dummy/sample/year=2017/month=11,
maprfs:/user/rajub/dummy/sample/year=2017/month=9

On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman 
wrote:

> Can you paste the actual query plan here, please?
>
> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti  wrote:
>
>
> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman 
> wrote:
>
>> What is the physical query plan after you set
>> spark.sql.hive.convertMetastoreParquet to true?
>>
> Physical plan continas all the partition locations
>
>>
>> Michael
>>
>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti  wrote:
>>
>> Thanks Michael for the respopnse.
>>
>>
>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman 
>> wrote:
>>
>>> Hi Raju,
>>>
>>> I'm sorry this isn't working for you. I helped author this functionality
>>> and will try my best to help.
>>>
>>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet
>>> to false?
>>>
>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did
>> not work for me without  setting *spark.sql.hive.convertMetastoreParquet*
>>  property.
>>
>> Can you link specifically to the jira issue or spark pr you referred to?
>>> The first thing I would try is setting 
>>> spark.sql.hive.convertMetastoreParquet
>>> to true. Setting that to false might also explain why you're getting
>>> parquet decode errors. If you're writing your table data with Spark's
>>> parquet file writer and reading with Hive's parquet file reader, there may
>>> be an incompatibility accounting for the decode errors you're seeing.
>>>
>>>  https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation
>> is to avoid fetching all the partitions. We reverted
>> spark.sql.hive.convertMetastoreParquet  setting to true to decoding
>> errors. After reverting this it is fetching all partiitons from the table.
>>
>> Can you reply with your table's Hive metastore schema, including
>>> partition schema?
>>>
>>  col1 string
>>  col2 string
>>  year int
>>  month int
>>  day int
>>  hour int
>>
>> # Partition Information
>>
>> # col_namedata_type   comment
>>
>> year  int
>>
>> month int
>>
>> day int
>>
>> hour int
>>
>> venture string
>>
>>>
>>>
>> Where are the table's files located?
>>>
>> In hadoop. Under some user directory.
>>
>>> If you do a "show partitions ." in the spark-sql
>>> shell, does it show the partitions you expect to see? If not, run "msck
>>> repair table .".
>>>
>> Yes. It is listing the partitions
>>
>>> Cheers,
>>>
>>> Michael
>>>
>>>
>>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti  wrote:
>>>
>>> Had a high level look into the code. Seems getHiveQlPartitions  method
>>> from HiveMetastoreCatalog is getting called irrespective of 
>>> metastorePartitionPruning
>>> conf value.
>>>
>>>  It should not fetch all partitions if we set metastorePartitionPruning to
>>> true (Default value for this is false)
>>>
>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] 
>>> = {
>>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>>> table.getPartitions(predicates)
>>>   } else {
>>> allPartitions
>>>   }
>>>
>>> ...
>>>
>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>>   client.getPartitionsByFilter(this, predicates)
>>>
>>> lazy val allPartitions = table.getAllPartitions
>>>
>>> But somehow getAllPartitions is getting called eventough after setting 
>>> metastorePartitionPruning to true.
>>>
>>> Am I missing something or looking at wrong place?
>>>
>>>
>>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti 
>>> wrote:
>>>
 Hello,

Spark sql is generating query plan with all partitions information
 even though if we apply filters on partitions in the query.  Due to
 this, sparkdriver/hive metastore is hitting with OOM as each table is
 with lots of partitions.

 We can confirm from hive audit logs that it tries to
 *fetch all partitions* from hive metastore.

  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.au

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
What version of Spark are you running?

> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti  wrote:
> 
>  describe dummy;
> 
> OK
> 
> sample  string   
> 
> yearstring   
> 
> month   string
> 
> # Partition Information
> 
> # col_namedata_type   comment
> 
> yearstring   
> 
> 
> month   string 
> 
> 
> 
> val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")
> 
> df: org.apache.spark.sql.DataFrame = [_c0: bigint]
> 
> scala> df.explain
> 
> == Physical Plan ==
> 
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[_c0#3070L])
> 
> +- TungstenExchange SinglePartition, None
> 
>+- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])
> 
>   +- Scan ParquetRelation: rajub.dummy[] InputPaths: 
> maprfs:/user/rajub/dummy/sample/year=2016/month=10, 
> maprfs:/user/rajub/dummy/sample/year=2016/month=11, 
> maprfs:/user/rajub/dummy/sample/year=2016/month=9, 
> maprfs:/user/rajub/dummy/sample/year=2017/month=10, 
> maprfs:/user/rajub/dummy/sample/year=2017/month=11, 
> maprfs:/user/rajub/dummy/sample/year=2017/month=9
> 
> 
> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman  > wrote:
> Can you paste the actual query plan here, please?
> 
>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti > > wrote:
>> 
>> 
>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman > > wrote:
>> What is the physical query plan after you set 
>> spark.sql.hive.convertMetastoreParquet to true?
>> Physical plan continas all the partition locations 
>> 
>> Michael
>> 
>>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti >> > wrote:
>>> 
>>> Thanks Michael for the respopnse.
>>> 
>>> 
>>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman >> > wrote:
>>> Hi Raju,
>>> 
>>> I'm sorry this isn't working for you. I helped author this functionality 
>>> and will try my best to help.
>>> 
>>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to 
>>> false? 
>>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did 
>>> not work for me without  setting spark.sql.hive.convertMetastoreParquet 
>>> property. 
>>> 
>>> Can you link specifically to the jira issue or spark pr you referred to? 
>>> The first thing I would try is setting 
>>> spark.sql.hive.convertMetastoreParquet to true. Setting that to false might 
>>> also explain why you're getting parquet decode errors. If you're writing 
>>> your table data with Spark's parquet file writer and reading with Hive's 
>>> parquet file reader, there may be an incompatibility accounting for the 
>>> decode errors you're seeing. 
>>> 
>>>  https://issues.apache.org/jira/browse/SPARK-6910 
>>>  . My main motivation is 
>>> to avoid fetching all the partitions. We reverted 
>>> spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. 
>>> After reverting this it is fetching all partiitons from the table.
>>> 
>>> Can you reply with your table's Hive metastore schema, including partition 
>>> schema?
>>>  col1 string
>>>  col2 string
>>>  year int
>>>  month int
>>>  day int
>>>  hour int   
>>> # Partition Information  
>>> 
>>> # col_namedata_type   comment
>>> 
>>> year  int
>>> 
>>> month int
>>> 
>>> day int
>>> 
>>> hour int
>>> 
>>> venture string
>>> 
>>>  
>>> Where are the table's files located?
>>> In hadoop. Under some user directory. 
>>> If you do a "show partitions ." in the spark-sql shell, 
>>> does it show the partitions you expect to see? If not, run "msck repair 
>>> table .".
>>> Yes. It is listing the partitions
>>> Cheers,
>>> 
>>> Michael
>>> 
>>> 
 On Jan 17, 2017, at 12:02 AM, Raju Bairishetti >>> > wrote:
 
 Had a high level look into the code. Seems getHiveQlPartitions  method 
 from HiveMetastoreCatalog is getting called irrespective of 
 metastorePartitionPruning conf value.
 
  It should not fetch all partitions if we set metastorePartitionPruning to 
 true (Default value for this is false) 
 def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] 
 = {
   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
 table.getPartitions(predicates)
   } else {
 allPartitions
   }
 ...
 def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
   client.getPartitionsByFilter(this, predicates)
 lazy val allPartitions = table.getAllPartitions
 But somehow getAllPartitions is getting called even

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Tested on both 1.5.2 and 1.61.

On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman 
wrote:

> What version of Spark are you running?
>
> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti  wrote:
>
>  describe dummy;
>
> OK
>
> sample  string
>
> yearstring
>
> month   string
>
> # Partition Information
>
> # col_namedata_type   comment
>
> yearstring
>
> month   string
>
>
> val df = sqlContext.sql("select count(1) from rajub.dummy where year='
> *2017*'")
>
> df: org.apache.spark.sql.DataFrame = [_c0: bigint]
>
>
> *scala> df.explain*
>
> == Physical Plan ==
>
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[_c0#3070L])
>
> +- TungstenExchange SinglePartition, None
>
>+- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[count#3076L])
>
>   +- Scan ParquetRelation: rajub.dummy[] InputPaths:
> maprfs:/user/rajub/dummy/sample/year=2016/month=10,
> maprfs:/user/rajub/dummy/sample/year=*2016*/month=11,
> maprfs:/user/rajub/dummy/sample/year=*2016*/month=9,
> maprfs:/user/rajub/dummy/sample/year=2017/month=10,
> maprfs:/user/rajub/dummy/sample/year=2017/month=11,
> maprfs:/user/rajub/dummy/sample/year=2017/month=9
>
> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman 
> wrote:
>
>> Can you paste the actual query plan here, please?
>>
>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti  wrote:
>>
>>
>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman 
>> wrote:
>>
>>> What is the physical query plan after you set
>>> spark.sql.hive.convertMetastoreParquet to true?
>>>
>> Physical plan continas all the partition locations
>>
>>>
>>> Michael
>>>
>>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti  wrote:
>>>
>>> Thanks Michael for the respopnse.
>>>
>>>
>>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman 
>>> wrote:
>>>
 Hi Raju,

 I'm sorry this isn't working for you. I helped author this
 functionality and will try my best to help.

 First, I'm curious why you set spark.sql.hive.convertMetastoreParquet
 to false?

>>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It
>>> did not work for me without  setting
>>> *spark.sql.hive.convertMetastoreParquet* property.
>>>
>>> Can you link specifically to the jira issue or spark pr you referred to?
 The first thing I would try is setting 
 spark.sql.hive.convertMetastoreParquet
 to true. Setting that to false might also explain why you're getting
 parquet decode errors. If you're writing your table data with Spark's
 parquet file writer and reading with Hive's parquet file reader, there may
 be an incompatibility accounting for the decode errors you're seeing.

  https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation
>>> is to avoid fetching all the partitions. We reverted
>>> spark.sql.hive.convertMetastoreParquet  setting to true to decoding
>>> errors. After reverting this it is fetching all partiitons from the table.
>>>
>>> Can you reply with your table's Hive metastore schema, including
 partition schema?

>>>  col1 string
>>>  col2 string
>>>  year int
>>>  month int
>>>  day int
>>>  hour int
>>>
>>> # Partition Information
>>>
>>> # col_namedata_type   comment
>>>
>>> year  int
>>>
>>> month int
>>>
>>> day int
>>>
>>> hour int
>>>
>>> venture string
>>>


>>> Where are the table's files located?

>>> In hadoop. Under some user directory.
>>>
 If you do a "show partitions ." in the spark-sql
 shell, does it show the partitions you expect to see? If not, run "msck
 repair table .".

>>> Yes. It is listing the partitions
>>>
 Cheers,

 Michael


 On Jan 17, 2017, at 12:02 AM, Raju Bairishetti  wrote:

 Had a high level look into the code. Seems getHiveQlPartitions  method
 from HiveMetastoreCatalog is getting called irrespective of 
 metastorePartitionPruning
 conf value.

  It should not fetch all partitions if we set metastorePartitionPruning
  to true (Default value for this is false)

 def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] 
 = {
   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
 table.getPartitions(predicates)
   } else {
 allPartitions
   }

 ...

 def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
   client.getPartitionsByFilter(this, predicates)

 lazy val allPartitions = table.getAllPartitions

 But somehow getAllPartitions is getting called eventough after setting 
 metastorePartitionPruning to true.

 Am I missing something or looking at wrong place?


 On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti 
 wrote:

> Hello,
>
>Spark sql is generating query plan wit

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
I think I understand. Partition pruning for the case where 
spark.sql.hive.convertMetastoreParquet is true was not added to Spark until 
2.1.0. I think that in previous versions it only worked when 
spark.sql.hive.convertMetastoreParquet is false. Unfortunately, that 
configuration gives you data decoding errors. If it's possible for you to write 
all of your data with Hive, then you should be able to read it without decoding 
errors and with partition pruning turned on. Another possibility is running 
your Spark app with a very large maximum heap configuration, like 8g or even 
16g. However, loading all of that partition metadata can be quite slow for very 
large tables. I'm sorry I can't think of a better solution for you.

Michael



> On Jan 17, 2017, at 8:59 PM, Raju Bairishetti  wrote:
> 
> Tested on both 1.5.2 and 1.61.
> 
> On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman  > wrote:
> What version of Spark are you running?
> 
>> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti > > wrote:
>> 
>>  describe dummy;
>> 
>> OK
>> 
>> sample  string  
>> 
>> yearstring  
>> 
>> month   string   
>> 
>> # Partition Information   
>> 
>> # col_namedata_type   comment
>> 
>> yearstring  
>> 
>> 
>> month   string 
>> 
>> 
>> 
>> val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")
>> 
>> df: org.apache.spark.sql.DataFrame = [_c0: bigint]
>> 
>> scala> df.explain
>> 
>> == Physical Plan ==
>> 
>> TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])
>> 
>> +- TungstenExchange SinglePartition, None
>> 
>>+- TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])
>> 
>>   +- Scan ParquetRelation: rajub.dummy[] InputPaths: 
>> maprfs:/user/rajub/dummy/sample/year=2016/month=10, 
>> maprfs:/user/rajub/dummy/sample/year=2016/month=11, 
>> maprfs:/user/rajub/dummy/sample/year=2016/month=9, 
>> maprfs:/user/rajub/dummy/sample/year=2017/month=10, 
>> maprfs:/user/rajub/dummy/sample/year=2017/month=11, 
>> maprfs:/user/rajub/dummy/sample/year=2017/month=9
>> 
>> 
>> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman > > wrote:
>> Can you paste the actual query plan here, please?
>> 
>>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti >> > wrote:
>>> 
>>> 
>>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman >> > wrote:
>>> What is the physical query plan after you set 
>>> spark.sql.hive.convertMetastoreParquet to true?
>>> Physical plan continas all the partition locations 
>>> 
>>> Michael
>>> 
 On Jan 17, 2017, at 6:51 PM, Raju Bairishetti >>> > wrote:
 
 Thanks Michael for the respopnse.
 
 
 On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman >>> > wrote:
 Hi Raju,
 
 I'm sorry this isn't working for you. I helped author this functionality 
 and will try my best to help.
 
 First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to 
 false? 
 I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did 
 not work for me without  setting spark.sql.hive.convertMetastoreParquet 
 property. 
 
 Can you link specifically to the jira issue or spark pr you referred to? 
 The first thing I would try is setting 
 spark.sql.hive.convertMetastoreParquet to true. Setting that to false 
 might also explain why you're getting parquet decode errors. If you're 
 writing your table data with Spark's parquet file writer and reading with 
 Hive's parquet file reader, there may be an incompatibility accounting for 
 the decode errors you're seeing. 
 
  https://issues.apache.org/jira/browse/SPARK-6910 
  . My main motivation is 
 to avoid fetching all the partitions. We reverted 
 spark.sql.hive.convertMetastoreParquet  setting to true to decoding 
 errors. After reverting this it is fetching all partiitons from the table.
 
 Can you reply with your table's Hive metastore schema, including partition 
 schema?
  col1 string
  col2 string
  year int
  month int
  day int
  hour int   
 # Partition Information 
 
 # col_namedata_type   comment
 
 year  int
 
 month int
 
 day int
 
 hour int
 
 venture string
 
  
 Where are the table's files located?
 In hadoop. Under some user directory. 
 If you do a "show partitions ." in the spark-sql shell, 
>>

Re: Weird experience Hive with Spark Transformations

2017-01-17 Thread Chetan Khatri
But Hive 1.2.1 do not have hive-site.xml, I tried to add my own which
causes me other several issues. On the other side it works well for me with
 Hive 2.0.1 where hive-site.xml content were as below and copied to
spark/conf too. it worked.

*5. hive-site.xml configuration setup*


Add below at conf/hive-site.xml , if not there then create it.




javax.jdo.option.ConnectionURL

jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true

metadata is stored in a MySQL server





javax.jdo.option.ConnectionDriverName

com.mysql.jdbc.Driver

MySQL JDBC driver class





javax.jdo.option.ConnectionUserName

hiveuser

user name for connecting to mysql server





javax.jdo.option.ConnectionPassword

hivepassword

password for connecting to mysql server




Replace below 3 properties tag with whatever already exist by default.
otherwise it will throw an error


"java.net.URISyntaxException: Relative path in absolute URI:
${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D"




hive.querylog.location

$HIVE_HOME/iotmp

Location of Hive run time structured log file






hive.exec.local.scratchdir

$HIVE_HOME/iotmp

Local scratch space for Hive jobs






hive.downloaded.resources.dir

$HIVE_HOME/iotmp

Temporary local directory for added resources in the remote
file system.





On Tue, Jan 17, 2017 at 10:01 PM, Dongjoon Hyun  wrote:

> Hi, Chetan.
>
> Did you copy your `hive-site.xml` into Spark conf directory? For example,
>
> cp /usr/local/hive/conf/hive-site.xml /usr/local/spark/conf
>
> If you want to use the existing Hive metastore, you need to provide that
> information to Spark.
>
> Bests,
> Dongjoon.
>
> On 2017-01-16 21:36 (-0800), Chetan Khatri 
> wrote:
> > Hello,
> >
> > I have following services are configured and installed successfully:
> >
> > Hadoop 2.7.x
> > Spark 2.0.x
> > HBase 1.2.4
> > Hive 1.2.1
> >
> > *Installation Directories:*
> >
> > /usr/local/hadoop
> > /usr/local/spark
> > /usr/local/hbase
> >
> > *Hive Environment variables:*
> >
> > #HIVE VARIABLES START
> > export HIVE_HOME=/usr/local/hive
> > export PATH=$PATH:$HIVE_HOME/bin
> > #HIVE VARIABLES END
> >
> > So, I can access Hive from anywhere as environment variables are
> > configured. Now if if i start my spark-shell & hive from location
> > /usr/local/hive then both work good for hive-metastore other wise from
> > where i start spark-shell where spark creates own meta-store.
> >
> > i.e I am reading from HBase and Writing to Hive using Spark. I dont know
> > why this is weird issue is.
> >
> >
> >
> >
> > Thanks.
> >
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Thanks for the detailed explanation. Is it completely fixed in spark-2.1.0?

  We are giving very high memory to spark-driver to avoid the OOM(heap
space/ GC overhead limit) errors in spark-app. But when we run two-three
jobs together, these are bringing down the Hive metastore. We had to
forcefully drop older partitions to avoid frequent downs of Hive Metastore.


On Wed, Jan 18, 2017 at 2:09 PM, Michael Allman 
wrote:

> I think I understand. Partition pruning for the case where 
> spark.sql.hive.convertMetastoreParquet
> is true was not added to Spark until 2.1.0. I think that in previous
> versions it only worked when spark.sql.hive.convertMetastoreParquet is
> false. Unfortunately, that configuration gives you data decoding errors. If
> it's possible for you to write all of your data with Hive, then you should
> be able to read it without decoding errors and with partition pruning
> turned on. Another possibility is running your Spark app with a very large
> maximum heap configuration, like 8g or even 16g. However, loading all of
> that partition metadata can be quite slow for very large tables. I'm sorry
> I can't think of a better solution for you.
>
> Michael
>
>
>
>
> On Jan 17, 2017, at 8:59 PM, Raju Bairishetti  wrote:
>
> Tested on both 1.5.2 and 1.61.
>
> On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman 
> wrote:
>
>> What version of Spark are you running?
>>
>> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti  wrote:
>>
>>  describe dummy;
>>
>> OK
>>
>> sample  string
>>
>> yearstring
>>
>> month   string
>>
>> # Partition Information
>>
>> # col_namedata_type   comment
>>
>> yearstring
>>
>> month   string
>>
>>
>> val df = sqlContext.sql("select count(1) from rajub.dummy where year='
>> *2017*'")
>>
>> df: org.apache.spark.sql.DataFrame = [_c0: bigint]
>>
>>
>> *scala> df.explain*
>>
>> == Physical Plan ==
>>
>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>> output=[_c0#3070L])
>>
>> +- TungstenExchange SinglePartition, None
>>
>>+- TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)],
>> output=[count#3076L])
>>
>>   +- Scan ParquetRelation: rajub.dummy[] InputPaths:
>> maprfs:/user/rajub/dummy/sample/year=2016/month=10,
>> maprfs:/user/rajub/dummy/sample/year=*2016*/month=11,
>> maprfs:/user/rajub/dummy/sample/year=*2016*/month=9,
>> maprfs:/user/rajub/dummy/sample/year=2017/month=10,
>> maprfs:/user/rajub/dummy/sample/year=2017/month=11,
>> maprfs:/user/rajub/dummy/sample/year=2017/month=9
>>
>> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman 
>> wrote:
>>
>>> Can you paste the actual query plan here, please?
>>>
>>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti  wrote:
>>>
>>>
>>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman 
>>> wrote:
>>>
 What is the physical query plan after you set
 spark.sql.hive.convertMetastoreParquet to true?

>>> Physical plan continas all the partition locations
>>>

 Michael

 On Jan 17, 2017, at 6:51 PM, Raju Bairishetti  wrote:

 Thanks Michael for the respopnse.


 On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman 
 wrote:

> Hi Raju,
>
> I'm sorry this isn't working for you. I helped author this
> functionality and will try my best to help.
>
> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet
> to false?
>
 I had set as suggested in SPARK-6910 and corresponsing pull reqs. It
 did not work for me without  setting
 *spark.sql.hive.convertMetastoreParquet* property.

 Can you link specifically to the jira issue or spark pr you referred
> to? The first thing I would try is setting 
> spark.sql.hive.convertMetastoreParquet
> to true. Setting that to false might also explain why you're getting
> parquet decode errors. If you're writing your table data with Spark's
> parquet file writer and reading with Hive's parquet file reader, there may
> be an incompatibility accounting for the decode errors you're seeing.
>
>  https://issues.apache.org/jira/browse/SPARK-6910 . My main
 motivation is to avoid fetching all the partitions. We reverted
 spark.sql.hive.convertMetastoreParquet  setting to true to decoding
 errors. After reverting this it is fetching all partiitons from the table.

 Can you reply with your table's Hive metastore schema, including
> partition schema?
>
  col1 string
  col2 string
  year int
  month int
  day int
  hour int

 # Partition Information

 # col_namedata_type   comment

 year  int

 month int

 day int

 hour int

 venture string

>
>
 Where are the table's files located?
>
 In hadoop. Under some user directory.

> If you do a "show partit

Re: Limit Query Performance Suggestion

2017-01-17 Thread Liang-Chi Hsieh

Hi Sujith,

I saw your updated post. Seems it makes sense to me now.

If you use a very big limit number, the shuffling before `GlobalLimit` would
be a bottleneck for performance, of course, even it can eventually shuffle
enough data to the single partition.

Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit`
must shuffle all limited data from all partitions to one single machine with
respect to query execution. In other words, I think we can avoid shuffling
data in `GlobalLimit`.

I have an idea to improve this and may update here later if I can make it
work.


sujith71955 wrote
> Dear Liang,
> 
> Thanks for your valuable feedback.
> 
> There was a mistake in the previous post  i corrected it, as you mentioned
> the  `GlobalLimit` we will only take the required number of rows from the
> input iterator which really pulls data from local blocks and remote
> blocks.
> but if the limit value is very high >= 1000,  and when there will be a
> shuffle exchange happens  between `GlobalLimit` and `LocalLimit` to
> retrieve data from all partitions to one partition, since the limit value
> is very large the performance bottleneck still exists.
>  
> soon in next  post i will publish a test report with sample data and also
> figuring out a solution for this problem. 
> 
> Please let me know for any clarifications or suggestions regarding this
> issue.
> 
> Regards,
> Sujith





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org