Hi Mich again, Regarding batch window, etc. I have provided the sources, but I'm not currently calling the window function. Did you see the program source? It's only 100 lines.
https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 Then I would expect I'm using defaults, other than what has been shown in the configuration. For example: In the launcher configuration I set --conf spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there are 500 messages for the duration set in the application: JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(1000)); Then with the --num-executors 6 \ submit flag, and the spark.streaming.kafka.maxRatePerPartition=500 I think that's how we arrive at the 3000 events per batch in the UI, pasted above. Feel free to correct me if I'm wrong. Then are you suggesting that I set the window? Maybe following this as reference: https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Ok > > What is the set up for these please? > > batch window > window length > sliding interval > > And also in each batch window how much data do you get in (no of messages in > the topic whatever)? > > > > > Dr Mich Talebzadeh > > > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > http://talebzadehmich.wordpress.com > > > > > On 18 June 2016 at 21:01, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: >> >> I believe you have an issue with performance? >> >> have you checked spark GUI (default 4040) for details including shuffles >> etc? >> >> HTH >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> >> On 18 June 2016 at 20:59, Colin Kincaid Williams <disc...@uw.edu> wrote: >>> >>> There are 25 nodes in the spark cluster. >>> >>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh >>> <mich.talebza...@gmail.com> wrote: >>> > how many nodes are in your cluster? >>> > >>> > --num-executors 6 \ >>> > --driver-memory 4G \ >>> > --executor-memory 2G \ >>> > --total-executor-cores 12 \ >>> > >>> > >>> > Dr Mich Talebzadeh >>> > >>> > >>> > >>> > LinkedIn >>> > >>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> > >>> > >>> > >>> > http://talebzadehmich.wordpress.com >>> > >>> > >>> > >>> > >>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <disc...@uw.edu> >>> > wrote: >>> >> >>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from >>> >> Kafka using the direct api and inserts content into an hbase cluster, >>> >> as described in this thread. I was away from this project for awhile >>> >> due to events in my family. >>> >> >>> >> Currently my scheduling delay is high, but the processing time is >>> >> stable around a second. I changed my setup to use 6 kafka partitions >>> >> on a set of smaller kafka brokers, with fewer disks. I've included >>> >> some details below, including the script I use to launch the >>> >> application. I'm using a Spark on Hbase library, whose version is >>> >> relevant to my Hbase cluster. Is it apparent there is something wrong >>> >> with my launch method that could be causing the delay, related to the >>> >> included jars? >>> >> >>> >> Or is there something wrong with the very simple approach I'm taking >>> >> for the application? >>> >> >>> >> Any advice is appriciated. >>> >> >>> >> >>> >> The application: >>> >> >>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>> >> >>> >> >>> >> From the streaming UI I get something like: >>> >> >>> >> table Completed Batches (last 1000 out of 27136) >>> >> >>> >> >>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total >>> >> Delay (?) Output Ops: Succeeded/Total >>> >> >>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1 >>> >> >>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1 >>> >> >>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1 >>> >> >>> >> >>> >> Here's how I'm launching the spark application. >>> >> >>> >> >>> >> #!/usr/bin/env bash >>> >> >>> >> export SPARK_CONF_DIR=/home/colin.williams/spark >>> >> >>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf >>> >> >>> >> export >>> >> >>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar >>> >> >>> >> >>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \ >>> >> >>> >> --class com.example.KafkaToHbase \ >>> >> >>> >> --master spark://spark_master:7077 \ >>> >> >>> >> --deploy-mode client \ >>> >> >>> >> --num-executors 6 \ >>> >> >>> >> --driver-memory 4G \ >>> >> >>> >> --executor-memory 2G \ >>> >> >>> >> --total-executor-cores 12 \ >>> >> >>> >> --jars >>> >> >>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar >>> >> \ >>> >> >>> >> --conf spark.app.name="Kafka To Hbase" \ >>> >> >>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \ >>> >> >>> >> --conf spark.eventLog.enabled=false \ >>> >> >>> >> --conf spark.eventLog.overwrite=true \ >>> >> >>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ >>> >> >>> >> --conf spark.streaming.backpressure.enabled=false \ >>> >> >>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \ >>> >> >>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \ >>> >> >>> >> --driver-java-options >>> >> >>> >> >>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/* >>> >> \ >>> >> >>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable" >>> >> "broker1:9092,broker2:9092" >>> >> >>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams >>> >> <disc...@uw.edu> >>> >> wrote: >>> >> > Thanks Cody, I can see that the partitions are well distributed... >>> >> > Then I'm in the process of using the direct api. >>> >> > >>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <c...@koeninger.org> >>> >> > wrote: >>> >> >> 60 partitions in and of itself shouldn't be a big performance issue >>> >> >> (as long as producers are distributing across partitions evenly). >>> >> >> >>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams >>> >> >> <disc...@uw.edu> >>> >> >> wrote: >>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3 >>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the >>> >> >>> issue with the receiver was the large number of partitions. I had >>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition >>> >> >>> my >>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first >>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion >>> >> >>> entries >>> >> >>> into Kafka on a map reduce job in 5 and a half hours. >>> >> >>> >>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my >>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library >>> >> >>> jars >>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit >>> >> >>> yesterday, I tried building against 1.5.2. So far it's running >>> >> >>> without >>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much >>> >> >>> improvement using the same code, but I'll see how the direct api >>> >> >>> handles it. In the end I can reduce the number of partitions in >>> >> >>> Kafka >>> >> >>> if it causes big performance issues. >>> >> >>> >>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger >>> >> >>> <c...@koeninger.org> >>> >> >>> wrote: >>> >> >>>> print() isn't really the best way to benchmark things, since it >>> >> >>>> calls >>> >> >>>> take(10) under the covers, but 380 records / second for a single >>> >> >>>> receiver doesn't sound right in any case. >>> >> >>>> >>> >> >>>> Am I understanding correctly that you're trying to process a >>> >> >>>> large >>> >> >>>> number of already-existing kafka messages, not keep up with an >>> >> >>>> incoming stream? Can you give any details (e.g. hardware, number >>> >> >>>> of >>> >> >>>> topicpartitions, etc)? >>> >> >>>> >>> >> >>>> Really though, I'd try to start with spark 1.6 and direct >>> >> >>>> streams, or >>> >> >>>> even just kafkacat, as a baseline. >>> >> >>>> >>> >> >>>> >>> >> >>>> >>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams >>> >> >>>> <disc...@uw.edu> wrote: >>> >> >>>>> Hello again. I searched for "backport kafka" in the list >>> >> >>>>> archives >>> >> >>>>> but >>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going >>> >> >>>>> to >>> >> >>>>> use accumulators to make a counter, but then saw on the >>> >> >>>>> Streaming >>> >> >>>>> tab >>> >> >>>>> the Receiver Statistics. Then I removed all other >>> >> >>>>> "functionality" >>> >> >>>>> except: >>> >> >>>>> >>> >> >>>>> >>> >> >>>>> JavaPairReceiverInputDStream<byte[], byte[]> dstream = >>> >> >>>>> KafkaUtils >>> >> >>>>> //createStream(JavaStreamingContext jssc,Class<K> >>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass, >>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String> >>> >> >>>>> kafkaParams, >>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel) >>> >> >>>>> .createStream(jssc, byte[].class, byte[].class, >>> >> >>>>> kafka.serializer.DefaultDecoder.class, >>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap, >>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER()); >>> >> >>>>> >>> >> >>>>> dstream.print(); >>> >> >>>>> >>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing >>> >> >>>>> around >>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned >>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records >>> >> >>>>> / >>> >> >>>>> second, just using the print output. This seems awfully high to >>> >> >>>>> me, >>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a >>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again >>> >> >>>>> using >>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar >>> >> >>>>> amount of reads. >>> >> >>>>> >>> >> >>>>> Even given the issues with the 1.2 receivers, is this the >>> >> >>>>> expected >>> >> >>>>> way >>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly >>> >> >>>>> wrong? >>> >> >>>>> >>> >> >>>>> My application looks like >>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>> >> >>>>> >>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger >>> >> >>>>> <c...@koeninger.org> >>> >> >>>>> wrote: >>> >> >>>>>> Have you tested for read throughput (without writing to hbase, >>> >> >>>>>> just >>> >> >>>>>> deserialize)? >>> >> >>>>>> >>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible? >>> >> >>>>>> The >>> >> >>>>>> kafka direct stream is available starting with 1.3. If you're >>> >> >>>>>> stuck >>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it, >>> >> >>>>>> search >>> >> >>>>>> the mailing list archives. >>> >> >>>>>> >>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams >>> >> >>>>>> <disc...@uw.edu> wrote: >>> >> >>>>>>> I've written an application to get content from a kafka topic >>> >> >>>>>>> with >>> >> >>>>>>> 1.7 >>> >> >>>>>>> billion entries, get the protobuf serialized entries, and >>> >> >>>>>>> insert >>> >> >>>>>>> into >>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark >>> >> >>>>>>> 1.2. >>> >> >>>>>>> >>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting >>> >> >>>>>>> between >>> >> >>>>>>> 0-2500 writes / second. This will take much too long to >>> >> >>>>>>> consume >>> >> >>>>>>> the >>> >> >>>>>>> entries. >>> >> >>>>>>> >>> >> >>>>>>> I currently believe that the spark kafka receiver is the >>> >> >>>>>>> bottleneck. >>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and >>> >> >>>>>>> didn't >>> >> >>>>>>> notice any large performance difference. I've tried many >>> >> >>>>>>> different >>> >> >>>>>>> spark configuration options, but can't seem to get better >>> >> >>>>>>> performance. >>> >> >>>>>>> >>> >> >>>>>>> I saw 80000 requests / second inserting these records into >>> >> >>>>>>> kafka >>> >> >>>>>>> using >>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion. >>> >> >>>>>>> >>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd >>> >> >>>>>>> like to >>> >> >>>>>>> at least get 10%. >>> >> >>>>>>> >>> >> >>>>>>> My application looks like >>> >> >>>>>>> >>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>> >> >>>>>>> >>> >> >>>>>>> This is my first spark application. I'd appreciate any >>> >> >>>>>>> assistance. >>> >> >>>>>>> >>> >> >>>>>>> >>> >> >>>>>>> >>> >> >>>>>>> --------------------------------------------------------------------- >>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >>>>>>> >>> >> >>>> >>> >> >>>> >>> >> >>>> --------------------------------------------------------------------- >>> >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >> >>>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >>>> >>> >> >>> >> --------------------------------------------------------------------- >>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >>> > >> >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org