I believe you have an issue with performance? have you checked spark GUI (default 4040) for details including shuffles etc?
HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 18 June 2016 at 20:59, Colin Kincaid Williams <disc...@uw.edu> wrote: > There are 25 nodes in the spark cluster. > > On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh > <mich.talebza...@gmail.com> wrote: > > how many nodes are in your cluster? > > > > --num-executors 6 \ > > --driver-memory 4G \ > > --executor-memory 2G \ > > --total-executor-cores 12 \ > > > > > > Dr Mich Talebzadeh > > > > > > > > LinkedIn > > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > > > > > http://talebzadehmich.wordpress.com > > > > > > > > > > On 18 June 2016 at 20:40, Colin Kincaid Williams <disc...@uw.edu> wrote: > >> > >> I updated my app to Spark 1.5.2 streaming so that it consumes from > >> Kafka using the direct api and inserts content into an hbase cluster, > >> as described in this thread. I was away from this project for awhile > >> due to events in my family. > >> > >> Currently my scheduling delay is high, but the processing time is > >> stable around a second. I changed my setup to use 6 kafka partitions > >> on a set of smaller kafka brokers, with fewer disks. I've included > >> some details below, including the script I use to launch the > >> application. I'm using a Spark on Hbase library, whose version is > >> relevant to my Hbase cluster. Is it apparent there is something wrong > >> with my launch method that could be causing the delay, related to the > >> included jars? > >> > >> Or is there something wrong with the very simple approach I'm taking > >> for the application? > >> > >> Any advice is appriciated. > >> > >> > >> The application: > >> > >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 > >> > >> > >> From the streaming UI I get something like: > >> > >> table Completed Batches (last 1000 out of 27136) > >> > >> > >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total > >> Delay (?) Output Ops: Succeeded/Total > >> > >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1 > >> > >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1 > >> > >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1 > >> > >> > >> Here's how I'm launching the spark application. > >> > >> > >> #!/usr/bin/env bash > >> > >> export SPARK_CONF_DIR=/home/colin.williams/spark > >> > >> export HADOOP_CONF_DIR=/etc/hadoop/conf > >> > >> export > >> > HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar > >> > >> > >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \ > >> > >> --class com.example.KafkaToHbase \ > >> > >> --master spark://spark_master:7077 \ > >> > >> --deploy-mode client \ > >> > >> --num-executors 6 \ > >> > >> --driver-memory 4G \ > >> > >> --executor-memory 2G \ > >> > >> --total-executor-cores 12 \ > >> > >> --jars > >> > /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar > >> \ > >> > >> --conf spark.app.name="Kafka To Hbase" \ > >> > >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \ > >> > >> --conf spark.eventLog.enabled=false \ > >> > >> --conf spark.eventLog.overwrite=true \ > >> > >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ > >> > >> --conf spark.streaming.backpressure.enabled=false \ > >> > >> --conf spark.streaming.kafka.maxRatePerPartition=500 \ > >> > >> --driver-class-path /home/colin.williams/kafka-hbase.jar \ > >> > >> --driver-java-options > >> > >> > -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/* > >> \ > >> > >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable" > >> "broker1:9092,broker2:9092" > >> > >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <disc...@uw.edu> > >> wrote: > >> > Thanks Cody, I can see that the partitions are well distributed... > >> > Then I'm in the process of using the direct api. > >> > > >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <c...@koeninger.org> > >> > wrote: > >> >> 60 partitions in and of itself shouldn't be a big performance issue > >> >> (as long as producers are distributing across partitions evenly). > >> >> > >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams < > disc...@uw.edu> > >> >> wrote: > >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3 > >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the > >> >>> issue with the receiver was the large number of partitions. I had > >> >>> miscounted the disks and so 11*3*2 is how I decided to partition my > >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first > >> >>> attempt ) . This worked well enough for me, I put 1.7 billion > entries > >> >>> into Kafka on a map reduce job in 5 and a half hours. > >> >>> > >> >>> I was concerned using spark 1.5.2 because I'm currently putting my > >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars > >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit > >> >>> yesterday, I tried building against 1.5.2. So far it's running > without > >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much > >> >>> improvement using the same code, but I'll see how the direct api > >> >>> handles it. In the end I can reduce the number of partitions in > Kafka > >> >>> if it causes big performance issues. > >> >>> > >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <c...@koeninger.org> > >> >>> wrote: > >> >>>> print() isn't really the best way to benchmark things, since it > calls > >> >>>> take(10) under the covers, but 380 records / second for a single > >> >>>> receiver doesn't sound right in any case. > >> >>>> > >> >>>> Am I understanding correctly that you're trying to process a large > >> >>>> number of already-existing kafka messages, not keep up with an > >> >>>> incoming stream? Can you give any details (e.g. hardware, number > of > >> >>>> topicpartitions, etc)? > >> >>>> > >> >>>> Really though, I'd try to start with spark 1.6 and direct streams, > or > >> >>>> even just kafkacat, as a baseline. > >> >>>> > >> >>>> > >> >>>> > >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams > >> >>>> <disc...@uw.edu> wrote: > >> >>>>> Hello again. I searched for "backport kafka" in the list archives > >> >>>>> but > >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going > to > >> >>>>> use accumulators to make a counter, but then saw on the Streaming > >> >>>>> tab > >> >>>>> the Receiver Statistics. Then I removed all other "functionality" > >> >>>>> except: > >> >>>>> > >> >>>>> > >> >>>>> JavaPairReceiverInputDStream<byte[], byte[]> dstream = > >> >>>>> KafkaUtils > >> >>>>> //createStream(JavaStreamingContext jssc,Class<K> > >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass, > >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String> > >> >>>>> kafkaParams, > >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel) > >> >>>>> .createStream(jssc, byte[].class, byte[].class, > >> >>>>> kafka.serializer.DefaultDecoder.class, > >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap, > >> >>>>> StorageLevel.MEMORY_AND_DISK_SER()); > >> >>>>> > >> >>>>> dstream.print(); > >> >>>>> > >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing > >> >>>>> around > >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned > >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records / > >> >>>>> second, just using the print output. This seems awfully high to > me, > >> >>>>> considering that I wrote 80000+ records a second to Kafka from a > >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again > using > >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar > >> >>>>> amount of reads. > >> >>>>> > >> >>>>> Even given the issues with the 1.2 receivers, is this the expected > >> >>>>> way > >> >>>>> to use the Kafka streaming API, or am I doing something terribly > >> >>>>> wrong? > >> >>>>> > >> >>>>> My application looks like > >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 > >> >>>>> > >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger < > c...@koeninger.org> > >> >>>>> wrote: > >> >>>>>> Have you tested for read throughput (without writing to hbase, > just > >> >>>>>> deserialize)? > >> >>>>>> > >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible? > The > >> >>>>>> kafka direct stream is available starting with 1.3. If you're > >> >>>>>> stuck > >> >>>>>> on 1.2, I believe there have been some attempts to backport it, > >> >>>>>> search > >> >>>>>> the mailing list archives. > >> >>>>>> > >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams > >> >>>>>> <disc...@uw.edu> wrote: > >> >>>>>>> I've written an application to get content from a kafka topic > with > >> >>>>>>> 1.7 > >> >>>>>>> billion entries, get the protobuf serialized entries, and > insert > >> >>>>>>> into > >> >>>>>>> hbase. Currently the environment that I'm running in is Spark > 1.2. > >> >>>>>>> > >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting > between > >> >>>>>>> 0-2500 writes / second. This will take much too long to consume > >> >>>>>>> the > >> >>>>>>> entries. > >> >>>>>>> > >> >>>>>>> I currently believe that the spark kafka receiver is the > >> >>>>>>> bottleneck. > >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and > >> >>>>>>> didn't > >> >>>>>>> notice any large performance difference. I've tried many > different > >> >>>>>>> spark configuration options, but can't seem to get better > >> >>>>>>> performance. > >> >>>>>>> > >> >>>>>>> I saw 80000 requests / second inserting these records into kafka > >> >>>>>>> using > >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion. > >> >>>>>>> > >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd > >> >>>>>>> like to > >> >>>>>>> at least get 10%. > >> >>>>>>> > >> >>>>>>> My application looks like > >> >>>>>>> > https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 > >> >>>>>>> > >> >>>>>>> This is my first spark application. I'd appreciate any > assistance. > >> >>>>>>> > >> >>>>>>> > >> >>>>>>> > --------------------------------------------------------------------- > >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org > >> >>>>>>> > >> >>>> > >> >>>> > --------------------------------------------------------------------- > >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> >>>> For additional commands, e-mail: user-h...@spark.apache.org > >> >>>> > >> > >> --------------------------------------------------------------------- > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > > >