I believe you have an issue with performance?

have you checked spark GUI (default 4040) for details including shuffles
etc?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 18 June 2016 at 20:59, Colin Kincaid Williams <disc...@uw.edu> wrote:

> There are 25 nodes in the spark cluster.
>
> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
> <mich.talebza...@gmail.com> wrote:
> > how many nodes are in your cluster?
> >
> > --num-executors 6 \
> >  --driver-memory 4G \
> >  --executor-memory 2G \
> >  --total-executor-cores 12 \
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 18 June 2016 at 20:40, Colin Kincaid Williams <disc...@uw.edu> wrote:
> >>
> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
> >> Kafka using the direct api and inserts content into an hbase cluster,
> >> as described in this thread. I was away from this project for awhile
> >> due to events in my family.
> >>
> >> Currently my scheduling delay is high, but the processing time is
> >> stable around a second. I changed my setup to use 6 kafka partitions
> >> on a set of smaller kafka brokers, with fewer disks. I've included
> >> some details below, including the script I use to launch the
> >> application. I'm using a Spark on Hbase library, whose version is
> >> relevant to my Hbase cluster. Is it apparent there is something wrong
> >> with my launch method that could be causing the delay, related to the
> >> included jars?
> >>
> >> Or is there something wrong with the very simple approach I'm taking
> >> for the application?
> >>
> >> Any advice is appriciated.
> >>
> >>
> >> The application:
> >>
> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
> >>
> >>
> >> From the streaming UI I get something like:
> >>
> >> table Completed Batches (last 1000 out of 27136)
> >>
> >>
> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
> >> Delay (?) Output Ops: Succeeded/Total
> >>
> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
> >>
> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
> >>
> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
> >>
> >>
> >> Here's how I'm launching the spark application.
> >>
> >>
> >> #!/usr/bin/env bash
> >>
> >> export SPARK_CONF_DIR=/home/colin.williams/spark
> >>
> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
> >>
> >> export
> >>
> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
> >>
> >>
> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
> >>
> >> --class com.example.KafkaToHbase \
> >>
> >> --master spark://spark_master:7077 \
> >>
> >> --deploy-mode client \
> >>
> >> --num-executors 6 \
> >>
> >> --driver-memory 4G \
> >>
> >> --executor-memory 2G \
> >>
> >> --total-executor-cores 12 \
> >>
> >> --jars
> >>
> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
> >> \
> >>
> >> --conf spark.app.name="Kafka To Hbase" \
> >>
> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
> >>
> >> --conf spark.eventLog.enabled=false \
> >>
> >> --conf spark.eventLog.overwrite=true \
> >>
> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> >>
> >> --conf spark.streaming.backpressure.enabled=false \
> >>
> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
> >>
> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
> >>
> >> --driver-java-options
> >>
> >>
> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
> >> \
> >>
> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
> >> "broker1:9092,broker2:9092"
> >>
> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <disc...@uw.edu>
> >> wrote:
> >> > Thanks Cody, I can see that the partitions are well distributed...
> >> > Then I'm in the process of using the direct api.
> >> >
> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <c...@koeninger.org>
> >> > wrote:
> >> >> 60 partitions in and of itself shouldn't be a big performance issue
> >> >> (as long as producers are distributing across partitions evenly).
> >> >>
> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <
> disc...@uw.edu>
> >> >> wrote:
> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
> >> >>> issue with the receiver was the large number of partitions. I had
> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition my
> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
> entries
> >> >>> into Kafka on a map reduce job in 5 and a half hours.
> >> >>>
> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
> >> >>> yesterday, I tried building against 1.5.2. So far it's running
> without
> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
> >> >>> improvement using the same code, but I'll see how the direct api
> >> >>> handles it. In the end I can reduce the number of partitions in
> Kafka
> >> >>> if it causes big performance issues.
> >> >>>
> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <c...@koeninger.org>
> >> >>> wrote:
> >> >>>> print() isn't really the best way to benchmark things, since it
> calls
> >> >>>> take(10) under the covers, but 380 records / second for a single
> >> >>>> receiver doesn't sound right in any case.
> >> >>>>
> >> >>>> Am I understanding correctly that you're trying to process a large
> >> >>>> number of already-existing kafka messages, not keep up with an
> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
> of
> >> >>>> topicpartitions, etc)?
> >> >>>>
> >> >>>> Really though, I'd try to start with spark 1.6 and direct streams,
> or
> >> >>>> even just kafkacat, as a baseline.
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
> >> >>>> <disc...@uw.edu> wrote:
> >> >>>>> Hello again. I searched for "backport kafka" in the list archives
> >> >>>>> but
> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
> to
> >> >>>>> use accumulators to make a counter, but then saw on the Streaming
> >> >>>>> tab
> >> >>>>> the Receiver Statistics. Then I removed all other "functionality"
> >> >>>>> except:
> >> >>>>>
> >> >>>>>
> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
> >> >>>>> KafkaUtils
> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
> >> >>>>> kafkaParams,
> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
> >> >>>>> kafka.serializer.DefaultDecoder.class,
> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
> >> >>>>>
> >> >>>>>        dstream.print();
> >> >>>>>
> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
> >> >>>>> around
> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records /
> >> >>>>> second, just using the print output. This seems awfully high to
> me,
> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
> using
> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
> >> >>>>> amount of reads.
> >> >>>>>
> >> >>>>> Even given the issues with the 1.2 receivers, is this the expected
> >> >>>>> way
> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
> >> >>>>> wrong?
> >> >>>>>
> >> >>>>> My application looks like
> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
> >> >>>>>
> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <
> c...@koeninger.org>
> >> >>>>> wrote:
> >> >>>>>> Have you tested for read throughput (without writing to hbase,
> just
> >> >>>>>> deserialize)?
> >> >>>>>>
> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
> The
> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
> >> >>>>>> stuck
> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
> >> >>>>>> search
> >> >>>>>> the mailing list archives.
> >> >>>>>>
> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
> >> >>>>>> <disc...@uw.edu> wrote:
> >> >>>>>>> I've written an application to get content from a kafka topic
> with
> >> >>>>>>> 1.7
> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
> insert
> >> >>>>>>> into
> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
> 1.2.
> >> >>>>>>>
> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
> between
> >> >>>>>>> 0-2500 writes / second. This will take much too long to consume
> >> >>>>>>> the
> >> >>>>>>> entries.
> >> >>>>>>>
> >> >>>>>>> I currently believe that the spark kafka receiver is the
> >> >>>>>>> bottleneck.
> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
> >> >>>>>>> didn't
> >> >>>>>>> notice any large performance difference. I've tried many
> different
> >> >>>>>>> spark configuration options, but can't seem to get better
> >> >>>>>>> performance.
> >> >>>>>>>
> >> >>>>>>> I saw 80000 requests / second inserting these records into kafka
> >> >>>>>>> using
> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
> >> >>>>>>>
> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
> >> >>>>>>> like to
> >> >>>>>>> at least get 10%.
> >> >>>>>>>
> >> >>>>>>> My application looks like
> >> >>>>>>>
> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
> >> >>>>>>>
> >> >>>>>>> This is my first spark application. I'd appreciate any
> assistance.
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>>
> ---------------------------------------------------------------------
> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
> >> >>>>>>>
> >> >>>>
> >> >>>>
> ---------------------------------------------------------------------
> >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> >>>> For additional commands, e-mail: user-h...@spark.apache.org
> >> >>>>
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>

Reply via email to