Since I am not familiar with your use case, I assume that you want to avoid
the cost of parsing the comma-separated string when writing to hbase table.

This schema is not friendly to query which involves only subset of the
values. HBase doesn't have limit on the number of columns in a given column
family. So you may consider breaking the comma-separated string into
multiple columns.

>From your first paragraph, caching value of 10000 indeed was high.
You can use binary search to get to a reasonable value for caching.

Thanks

On Thu, Mar 3, 2016 at 7:52 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi Ted,
>
> I'd say about 70th percentile keys have 2 columns each having a string of
> 20k comma separated values. Top few hundred row keys have about 100-700k
> comma separated values for those keys. I know that an extra FAT table.
>
> Yes I can remove  "hConf.setBoolean("hbase.cluster.distributed", true)".
>
> ps - any suggestion on breaking down hbase column values is appreciated.
> Design is there are two column families in table. They both share common
> rowkey (filter criterias). However one column family column qualifiers has
> raw values in a comma-separated string form that we cannot pre-aggregate.
>
> Thanks
> Nirav
>
> On Thu, Mar 3, 2016 at 8:16 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> bq.  hConf.setBoolean("hbase.cluster.distributed", true)
>>
>> Not sure why the above is needed. If hbase-site.xml is on the classpath,
>> it should contain the above setting already.
>>
>> FYI
>>
>> On Thu, Mar 3, 2016 at 6:08 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> From the log snippet you posted, it was not clear why connection got
>>> lost. You can lower the value for caching and see if GC activity gets
>>> lower.
>>>
>>> How wide are the rows in hbase table ?
>>>
>>> Thanks
>>>
>>> On Mar 3, 2016, at 1:01 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>>>
>>> so why does 'saveAsHadoopDataset' incurs so much memory pressure?
>>> Should I try to reduce hbase caching value ?
>>>
>>> On Wed, Mar 2, 2016 at 7:51 AM, Nirav Patel <npa...@xactlycorp.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a spark jobs that runs on yarn and keeps failing at line where i do 
>>>> :
>>>>
>>>>
>>>> val hConf = HBaseConfiguration.create
>>>>     hConf.setInt("hbase.client.scanner.caching", 10000)
>>>>     hConf.setBoolean("hbase.cluster.distributed", true)
>>>>
>>>> new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig)
>>>>
>>>>
>>>> Basically at this stage multiple Executors fails after high GC activities. 
>>>> However none of the executor logs, driver logs or node manager logs 
>>>> indicate any OutOfMemory errors or GC Overhead Exceeded errors or memory 
>>>> limits exceeded errors. I don't see any other reason for Executor failures 
>>>> as well.
>>>>
>>>>
>>>> Driver Logs:
>>>>
>>>> Failing Oozie Launcher, Main class 
>>>> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job 
>>>> aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most 
>>>> recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): 
>>>> ExecutorLostFailure (executor 5 lost)
>>>> Driver stacktrace:
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 
>>>> 388 in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in 
>>>> stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost)
>>>> Driver stacktrace:
>>>>    at 
>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>>>>    at 
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>>>    at scala.Option.foreach(Option.scala:236)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>>>>    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>    at 
>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>>>>    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
>>>>    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>>>>    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
>>>>    at 
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
>>>>    at 
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
>>>>    at 
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>>>    at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>>>>    at 
>>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
>>>>
>>>>
>>>>
>>>> Executor logs:
>>>>
>>>>
>>>> 16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 8.0 
>>>> (TID 15318). 2099 bytes result sent to driver
>>>> 16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
>>>> task 15333
>>>> 16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0 
>>>> (TID 15333)
>>>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125 
>>>> non-empty blocks out of 3007 blocks
>>>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14 
>>>> remote fetches in 10 ms
>>>> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to 
>>>> maprnode5 has been quiet for 120000 ms while there are outstanding 
>>>> requests. Assuming connection is dead; please adjust spark.network.timeout 
>>>> if this is wrong.
>>>> 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1 
>>>> requests outstanding when connection from maprnode5 is closed
>>>> 16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while 
>>>> starting block fetches
>>>> java.io.IOException: Connection from maprnode5 closed
>>>>         at 
>>>> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
>>>>         at 
>>>> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
>>>>         at 
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>         at 
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>         at 
>>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>>>         at 
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>         at 
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>         at 
>>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>>>         at 
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>         at 
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>         at 
>>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>>>         at 
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>         at 
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>         at 
>>>> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
>>>>         at 
>>>> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
>>>>         at 
>>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>>>         at 
>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>         at java.lang.Thread.run(Thread.java:744)
>>>> 16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) 
>>>> for 6 outstanding blocks after 5000 ms
>>>> 16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive 
>>>> connection to maprnode5, creating a new one.
>>>> 16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in 
>>>> connection from maprnode5
>>>> java.io.IOException: Connection reset by peer
>>>>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>>>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>>>>         at 
>>>> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
>>>>         at 
>>>> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>>>>         at 
>>>> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
>>>>         at 
>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>>>>         at 
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>         at 
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>         at 
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>         at 
>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>         at java.lang.Thread.run(Thread.java:744)
>>>> 16/02/24 11:12:16 ERROR client.TransportResponseHandler: Still have 1 
>>>> requests outstanding when connection from maprnode5 is closed
>>>> 16/02/24 11:12:16 ERROR shuffle.OneForOneBlockFetcher: Failed while 
>>>> starting block fetches
>>>>
>>>>
>>>
>>>
>>>
>>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>>
>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>>> <https://twitter.com/Xactly>  [image: Facebook]
>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>> <http://www.youtube.com/xactlycorporation>
>>>
>>>
>>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>
>

Reply via email to