What is also strange is that this seems to work on external JSON data, but not 
Parquet.  I’ll try to do more verification of that next week.


> On May 22, 2015, at 16:24, yana <yana.kadiy...@gmail.com> wrote:
> 
> There is an open Jira on Spark not pushing predicates to metastore. I have a 
> large dataset with many partitions but doing anything with it 8s very 
> slow...But I am surprised Spark 1.2 worked for you: it has this problem...
> 
> -------- Original message --------
> From: Andrew Otto
> Date:05/22/2015 3:51 PM (GMT-05:00)
> To: user@spark.apache.org
> Cc: Joseph Allemandou ,Madhumitha Viswanathan
> Subject: HiveContext fails when querying large external Parquet tables
> 
> Hi all,
> 
> (This email was easier to write in markdown, so I’ve created a gist with its 
> contents here: https://gist.github.com/ottomata/f91ea76cece97444e269 
> <https://gist.github.com/ottomata/f91ea76cece97444e269>.  I’ll paste the 
> markdown content in the email body here too.)
> 
> ---
> We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 
> 1.1.0.  Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0. 
>  Since upgrading, we can no longer query our large webrequest dataset using 
> HiveContext.  HiveContext + Parquet and other file types work fine with 
> external tables (We have a similarly large JSON external table that works 
> just fine with HiveContext.)
> 
> Our webrequest dataset is stored in hourly partitioned Parquet files.  We 
> mainly interact with this dataset via a Hive external table, but also have 
> been using Spark's HiveContext.
> 
> ```
> # This single hourly directory is only 5.3M
> $ hdfs dfs -du -s -h 
> /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
> 5.3 M  15.8 M  
> /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
> 
> # This monthly directory is 1.8T.  (There are subdirectories down to hourly 
> level here too.)
> $ hdfs dfs -du -s -h 
> /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
> 1.8 T  5.3 T  
> /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
> ```
> 
> If I create a Hive table on top of this data, and add the single hourly 
> partition, querying works via both Hive and Spark HiveContext
> 
> ```sql
> hive (otto)> CREATE EXTERNAL TABLE IF NOT EXISTS 
> `otto.webrequest_few_partitions_big_data`(
>     `hostname`          string  COMMENT 'Source node hostname',
>     `sequence`          bigint  COMMENT 'Per host sequence number',
>     `dt`                string  COMMENT 'Timestame at cache in ISO 8601',
>     `time_firstbyte`    double  COMMENT 'Time to first byte',
>     `ip`                string  COMMENT 'IP of packet at cache',
>     `cache_status`      string  COMMENT 'Cache status',
>     `http_status`       string  COMMENT 'HTTP status of response',
>     `response_size`     bigint  COMMENT 'Response size',
>     `http_method`       string  COMMENT 'HTTP method of request',
>     `uri_host`          string  COMMENT 'Host of request',
>     `uri_path`          string  COMMENT 'Path of request',
>     `uri_query`         string  COMMENT 'Query of request',
>     `content_type`      string  COMMENT 'Content-Type header of response',
>     `referer`           string  COMMENT 'Referer header of request',
>     `x_forwarded_for`   string  COMMENT 'X-Forwarded-For header of request',
>     `user_agent`        string  COMMENT 'User-Agent header of request',
>     `accept_language`   string  COMMENT 'Accept-Language header of request',
>     `x_analytics`       string  COMMENT 'X-Analytics header of response',
>     `range`             string  COMMENT 'Range header of response',
>     `is_pageview`       boolean COMMENT 'Indicates if this record was marked 
> as a pageview during refinement',
>     `record_version`    string  COMMENT 'Keeps track of changes in the table 
> content definition - 
> https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest' 
> <https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest'>,
>     `client_ip`         string  COMMENT 'Client IP computed during refinement 
> using ip and x_forwarded_for',
>     `geocoded_data`     map<string, string>  COMMENT 'Geocoded map with 
> continent, country_code, country, city, subdivision, postal_code, latitude, 
> longitude, timezone keys  and associated values.',
>     `x_cache`           string  COMMENT 'X-Cache header of response',
>     `user_agent_map`    map<string, string>  COMMENT 'User-agent map with 
> browser_name, browser_major, device, os_name, os_minor, os_major keys and 
> associated values',
>     `x_analytics_map`   map<string, string>  COMMENT 'X_analytics map view of 
> the x_analytics field',
>     `ts`                timestamp            COMMENT 'Unix timestamp in 
> milliseconds extracted from dt',
>     `access_method`     string  COMMENT 'Method used to accessing the site 
> (mobile app|mobile web|desktop)',
>     `agent_type`        string  COMMENT 'Categorise the agent making the 
> webrequest as either user or spider (automatas to be added).',
>     `is_zero`           boolean COMMENT 'Indicates if the webrequest is 
> accessed through a zero provider',
>     `referer_class`     string  COMMENT 'Indicates if a referer is internal, 
> external or unknown.'
> )
> PARTITIONED BY (
>     `webrequest_source` string  COMMENT 'Source cluster',
>     `year`              int     COMMENT 'Unpadded year of request',
>     `month`             int     COMMENT 'Unpadded month of request',
>     `day`               int     COMMENT 'Unpadded day of request',
>     `hour`              int     COMMENT 'Unpadded hour of request'
> )
> CLUSTERED BY(hostname, sequence) INTO 64 BUCKETS
> STORED AS PARQUET
> LOCATION '/wmf/data/wmf/webrequest'
> ;
> 
> hive (otto)> alter table otto.webrequest_few_partitions_big_data add 
> partition (webrequest_source='misc', year=2015, month=5, day=20, hour=0) 
> location 
> '/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0';
> 
> hive (otto)> SELECT uri_host, uri_path from 
> otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
> year=2015 and month=5 and day=20 and hour=0 limit 4;
> OK
> uri_host      uri_path
> graphite.wikimedia.org <http://graphite.wikimedia.org/>       /render
> graphite.wikimedia.org <http://graphite.wikimedia.org/>       /render
> stats.wikimedia.org <http://stats.wikimedia.org/>     
> /EN/PlotTotalArticlesBS.png
> etherpad.wikimedia.org <http://etherpad.wikimedia.org/>       
> /socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ
> ```
> 
> ```scala
> $ spark-shell
> 
> scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
> 
> scala> val query = "SELECT uri_host, uri_path from 
> otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
> year=2015 and month=5 and day=20 and hour=0 limit 4"
> 
> scala> hc.sql(query).collect().foreach(println)
> [graphite.wikimedia.org <http://graphite.wikimedia.org/>,/render]
> [graphite.wikimedia.org <http://graphite.wikimedia.org/>,/render]
> [stats.wikimedia.org 
> <http://stats.wikimedia.org/>,/EN/PlotTotalArticlesBS.png]
> [etherpad.wikimedia.org 
> <http://etherpad.wikimedia.org/>,/socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ]
> 
> ```
> 
> But, when I add more data, Spark either OOMs, or results in Premature EOF (if 
> I bump up memory).  I've tried this with many  hourly partitions, or just two 
> partitions one with lots of data.  In either case the same amount of data was 
> present.  For this example I'll just add one large partition to the table.
> 
> ```sql
> -- Of course day=0, hour=25 doesn't make sense, but I'm including all
> -- of May in this one paritition, so I just chose dummy partition keys
> hive (otto)> alter table otto.webrequest_few_partitions_big_data add 
> partition (webrequest_source='bits', year=2015, month=5, day=0, hour=25) 
> location '/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5';
> 
> hive (otto)> show partitions otto.webrequest_few_partitions_big_data;
> OK
> partition
> webrequest_source=bits/year=2015/month=5/day=0/hour=25
> webrequest_source=misc/year=2015/month=5/day=20/hour=0
> 
> hive (otto)> SELECT uri_host, uri_path from 
> otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
> year=2015 and month=5 and day=20 and hour=0 limit 4;
> OK
> uri_host      uri_path
> graphite.wikimedia.org <http://graphite.wikimedia.org/>       /render
> graphite.wikimedia.org <http://graphite.wikimedia.org/>       /render
> stats.wikimedia.org <http://stats.wikimedia.org/>     
> /EN/PlotTotalArticlesBS.png
> etherpad.wikimedia.org <http://etherpad.wikimedia.org/>       
> /socket.io/1/xhr-polling/nuoaggODBKYWJ4sCvfuZ
> ```
> 
> Now, let's try the same query we did before in spark-shell.
> 
> ```scala
> $ spark-shell
> 
> Spark context available as sc.
> 15/05/22 19:04:25 INFO SparkILoop: Created sql context (with Hive support)..
> SQL context available as sqlContext.
> 
> scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
> hc: org.apache.spark.sql.hive.HiveContext = 
> org.apache.spark.sql.hive.HiveContext@38fe4d15
> 
> scala> val query = "SELECT uri_host, uri_path from 
> otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
> year=2015 and month=5 and day=20 and hour=0 limit 4"
> query: String = SELECT uri_host, uri_path from 
> otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
> year=2015 and month=5 and day=20 and hour=0 limit 4
> 
> scala> hc.sql(query).collect().foreach(println)
> 15/05/22 19:04:42 INFO metastore: Trying to connect to metastore with URI 
> thrift://analytics1027.eqiad.wmnet:9083 
> <thrift://analytics1027.eqiad.wmnet:9083>
> 15/05/22 19:04:42 INFO metastore: Connected to metastore.
> 15/05/22 19:04:43 INFO SessionState: Created local directory: 
> /tmp/fcb5ac90-78c7-4368-adac-085d7935fd6b_resources
> 15/05/22 19:04:43 INFO SessionState: Created HDFS directory: 
> /tmp/hive/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b
> 15/05/22 19:04:43 INFO SessionState: Created local directory: 
> /tmp/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b
> 15/05/22 19:04:43 INFO SessionState: Created HDFS directory: 
> /tmp/hive/otto/fcb5ac90-78c7-4368-adac-085d7935fd6b/_tmp_space.db
> 15/05/22 19:04:43 INFO SessionState: No Tez session required at this point. 
> hive.execution.engine=mr.
> 15/05/22 19:04:44 INFO ParseDriver: Parsing command: SELECT uri_host, 
> uri_path from otto.webrequest_few_partitions_big_data where 
> webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 
> limit 4
> 15/05/22 19:04:44 INFO ParseDriver: Parse Completed
> 
> ### Many minutes pass... ###
> 
> Exception in thread "qtp454555988-129" Exception in thread "IPC Client 
> (1610773823) connection to analytics1001.eqiad.wmnet/10.64.36.118:8020 from 
> otto" Exception in thread "qtp454555988-52"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "sparkDriver-scheduler-1"
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> 15/05/22 18:49:15 INFO RetryInvocationHandler: Exception while invoking 
> getBlockLocations of class ClientNamenodeProtocolTranslatorPB over 
> analytics1001.eqiad.wmnet/10.64.36.118:8020. Trying to fail over immediately.
> java.io.IOException: com.google.protobuf.ServiceException: 
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>       at 
> org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
>       at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy17.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1211)
>       at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>       at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1191)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:299)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:265)
>       at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:257)
>       at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1490)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
>       at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
>       at 
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:411)
>       at 
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)
>       at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)
>       at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)
>       at 
> scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
>       at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
>       at 
> scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
>       at 
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.popAndExecAll(ForkJoinPool.java:1243)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1344)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: 
> GC overhead limit exceeded
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:274)
>       at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254)
>       ... 36 more
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> ```
> 
> Note, I have also seen slightly different OOM errors on the CLI.   Sometimes 
> I see:
> ```
> 15/05/22 19:09:22 ERROR ActorSystemImpl: exception on LARS’ timer thread
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> 15/05/22 19:09:29 INFO ActorSystemImpl: starting new LARS thread
> 15/05/22 19:09:29 ERROR ActorSystemImpl: Uncaught fatal error from thread 
> [sparkDriver-scheduler-1] shutting down ActorSystem [sparkDriver]
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> Exception in thread "Driver Heartbeater" java.lang.OutOfMemoryError: GC 
> overhead limit exceeded
> 
> ...
> ```
> 
> 
> Ok, so let's increase the driver memory:
> 
> ```scala
> $ spark-shell --driver-memory 1500M
> Spark context available as sc.
> 15/05/22 19:09:07 INFO SparkILoop: Created sql context (with Hive support)..
> SQL context available as sqlContext.
> 
> scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
> 
> scala> val query = "SELECT uri_host, uri_path from 
> otto.webrequest_few_partitions_big_data where webrequest_source='misc' and 
> year=2015 and month=5 and day=20 and hour=0 limit 4"
> 
> scala> hc.sql(query).collect().foreach(println)
> 15/05/22 19:11:41 INFO metastore: Trying to connect to metastore with URI 
> thrift://analytics1027.eqiad.wmnet:9083 
> <thrift://analytics1027.eqiad.wmnet:9083>
> 15/05/22 19:11:41 INFO metastore: Connected to metastore.
> 15/05/22 19:11:42 INFO SessionState: Created local directory: 
> /tmp/5f8996fc-ce1a-4954-8c6c-94c291aa8c70_resources
> 15/05/22 19:11:42 INFO SessionState: Created HDFS directory: 
> /tmp/hive/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70
> 15/05/22 19:11:42 INFO SessionState: Created local directory: 
> /tmp/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70
> 15/05/22 19:11:42 INFO SessionState: Created HDFS directory: 
> /tmp/hive/otto/5f8996fc-ce1a-4954-8c6c-94c291aa8c70/_tmp_space.db
> 15/05/22 19:11:42 INFO SessionState: No Tez session required at this point. 
> hive.execution.engine=mr.
> 15/05/22 19:11:43 INFO ParseDriver: Parsing command: SELECT uri_host, 
> uri_path from otto.webrequest_few_partitions_big_data where 
> webrequest_source='misc' and year=2015 and month=5 and day=20 and hour=0 
> limit 4
> 15/05/22 19:11:44 INFO ParseDriver: Parse Completed
> 
> ### Almost 30 minutes pass... ###
> 
> java.io.EOFException: Premature EOF: no length prefix available
>       at 
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2212)
>       at 
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:408)
>       at 
> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:796)
>       at 
> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:674)
>       at 
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:621)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:847)
>       at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
>       at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:700)
>       at java.io.FilterInputStream.read(FilterInputStream.java:83)
>       at parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:66)
>       at 
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:423)
>       at 
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)
>       at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)
>       at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)
>       at 
> scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
>       at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
>       at 
> scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:172)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
>       at 
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runSubtask(ForkJoinPool.java:1357)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.tryHelpStealer(ForkJoinPool.java:2253)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.awaitJoin(ForkJoinPool.java:2377)
>       at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
>       at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
>       at 
> scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:444)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:514)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:187)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
>       at 
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
> 
> I've tested this both in local mode and in YARN client mode, and both have 
> similar behavoirs.  What's worrysome is that the behavior is different after 
> adding more data to the table, even though I am querying the same very small 
> partition.  The whole point of Hive partitions is to allow jobs to work with 
> only the data that is needed.  I'm not sure what Spark  HiveContext is doing 
> here, but it seems to couple the full size of a Hive table to the performance 
> of a query that only needs a very small amount of data.
> 
> I poked around the Spark source, and for a minute thought this might be 
> related: https://github.com/apache/spark/commit/42389b17 
> <https://github.com/apache/spark/commit/42389b17>, but that was included in 
> Spark 1.2.0, and this was working for us fine.
> 
> Is HiveContext somehow trying to scan the whole table in the driver?  Has 
> anyone else had this problem?
> 
> Thanks!
> 
> -Andrew Otto
> 

Reply via email to