It's write once table. Mainly used for read/query intensive application. We
in fact generate comma separated string from an array and store it in
single column qualifer.  I will look into approach you suggested.

Reading of this table is via spark. Its analytic application which loads
hbase table as a spark rdd. When request comes we run spark transformations
and collect actions. Do you think of any better way to load such hbase
table as an rdd? Currently we have a RDD made of scala case object where
each field is populated from value from hbase column qaulifier. Where
comma-separated string is loaded as Array[Double]

Thanks


On Thu, Mar 3, 2016 at 9:30 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Since I am not familiar with your use case, I assume that you want to
> avoid the cost of parsing the comma-separated string when writing to
> hbase table.
>
> This schema is not friendly to query which involves only subset of the
> values. HBase doesn't have limit on the number of columns in a given column
> family. So you may consider breaking the comma-separated string into
> multiple columns.
>
> From your first paragraph, caching value of 10000 indeed was high.
> You can use binary search to get to a reasonable value for caching.
>
> Thanks
>
> On Thu, Mar 3, 2016 at 7:52 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> Hi Ted,
>>
>> I'd say about 70th percentile keys have 2 columns each having a string of
>> 20k comma separated values. Top few hundred row keys have about 100-700k
>> comma separated values for those keys. I know that an extra FAT table.
>>
>> Yes I can remove  "hConf.setBoolean("hbase.cluster.distributed", true)".
>>
>> ps - any suggestion on breaking down hbase column values is appreciated.
>> Design is there are two column families in table. They both share common
>> rowkey (filter criterias). However one column family column qualifiers has
>> raw values in a comma-separated string form that we cannot pre-aggregate.
>>
>> Thanks
>> Nirav
>>
>> On Thu, Mar 3, 2016 at 8:16 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq.  hConf.setBoolean("hbase.cluster.distributed", true)
>>>
>>> Not sure why the above is needed. If hbase-site.xml is on the classpath,
>>> it should contain the above setting already.
>>>
>>> FYI
>>>
>>> On Thu, Mar 3, 2016 at 6:08 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> From the log snippet you posted, it was not clear why connection got
>>>> lost. You can lower the value for caching and see if GC activity gets
>>>> lower.
>>>>
>>>> How wide are the rows in hbase table ?
>>>>
>>>> Thanks
>>>>
>>>> On Mar 3, 2016, at 1:01 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>>>>
>>>> so why does 'saveAsHadoopDataset' incurs so much memory pressure?
>>>> Should I try to reduce hbase caching value ?
>>>>
>>>> On Wed, Mar 2, 2016 at 7:51 AM, Nirav Patel <npa...@xactlycorp.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a spark jobs that runs on yarn and keeps failing at line where i 
>>>>> do :
>>>>>
>>>>>
>>>>> val hConf = HBaseConfiguration.create
>>>>>     hConf.setInt("hbase.client.scanner.caching", 10000)
>>>>>     hConf.setBoolean("hbase.cluster.distributed", true)
>>>>>
>>>>> new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig)
>>>>>
>>>>>
>>>>> Basically at this stage multiple Executors fails after high GC 
>>>>> activities. However none of the executor logs, driver logs or node 
>>>>> manager logs indicate any OutOfMemory errors or GC Overhead Exceeded 
>>>>> errors or memory limits exceeded errors. I don't see any other reason for 
>>>>> Executor failures as well.
>>>>>
>>>>>
>>>>> Driver Logs:
>>>>>
>>>>> Failing Oozie Launcher, Main class 
>>>>> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job 
>>>>> aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most 
>>>>> recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): 
>>>>> ExecutorLostFailure (executor 5 lost)
>>>>> Driver stacktrace:
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 
>>>>> 388 in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in 
>>>>> stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost)
>>>>> Driver stacktrace:
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>>>>>   at 
>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>>>>   at scala.Option.foreach(Option.scala:236)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>>>>>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>   at 
>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>>>>>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
>>>>>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>>>>>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
>>>>>   at 
>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
>>>>>   at 
>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
>>>>>   at 
>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
>>>>>   at 
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>>>>   at 
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>>>>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>>>>>   at 
>>>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
>>>>>
>>>>>
>>>>>
>>>>> Executor logs:
>>>>>
>>>>>
>>>>> 16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 
>>>>> 8.0 (TID 15318). 2099 bytes result sent to driver
>>>>> 16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got 
>>>>> assigned task 15333
>>>>> 16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0 
>>>>> (TID 15333)
>>>>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125 
>>>>> non-empty blocks out of 3007 blocks
>>>>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14 
>>>>> remote fetches in 10 ms
>>>>> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to 
>>>>> maprnode5 has been quiet for 120000 ms while there are outstanding 
>>>>> requests. Assuming connection is dead; please adjust 
>>>>> spark.network.timeout if this is wrong.
>>>>> 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1 
>>>>> requests outstanding when connection from maprnode5 is closed
>>>>> 16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while 
>>>>> starting block fetches
>>>>> java.io.IOException: Connection from maprnode5 closed
>>>>>         at 
>>>>> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
>>>>>         at 
>>>>> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
>>>>>         at 
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>>         at 
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>>         at 
>>>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>>>>         at 
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>>         at 
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>>         at 
>>>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>>>>         at 
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>>         at 
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>>         at 
>>>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>>>>         at 
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>>         at 
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>>         at 
>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
>>>>>         at 
>>>>> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
>>>>>         at 
>>>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>>>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>>>>         at 
>>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>> 16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) 
>>>>> for 6 outstanding blocks after 5000 ms
>>>>> 16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive 
>>>>> connection to maprnode5, creating a new one.
>>>>> 16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in 
>>>>> connection from maprnode5
>>>>> java.io.IOException: Connection reset by peer
>>>>>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>>>>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>>>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>>>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>>>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>>>>>         at 
>>>>> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
>>>>>         at 
>>>>> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>>>>>         at 
>>>>> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
>>>>>         at 
>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>>>>>         at 
>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>         at 
>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>         at 
>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>         at 
>>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>> 16/02/24 11:12:16 ERROR client.TransportResponseHandler: Still have 1 
>>>>> requests outstanding when connection from maprnode5 is closed
>>>>> 16/02/24 11:12:16 ERROR shuffle.OneForOneBlockFetcher: Failed while 
>>>>> starting block fetches
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> [image: What's New with Xactly]
>>>> <http://www.xactlycorp.com/email-click/>
>>>>
>>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>>>> <https://twitter.com/Xactly>  [image: Facebook]
>>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>>> <http://www.youtube.com/xactlycorporation>
>>>>
>>>>
>>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>

Reply via email to