Thanks again Cody. Regarding the details 66 kafka partitions on 3 kafka servers, likely 8 core systems with 10 disks each. Maybe the issue with the receiver was the large number of partitions. I had miscounted the disks and so 11*3*2 is how I decided to partition my topic on insertion, ( by my own, unjustified reasoning, on a first attempt ) . This worked well enough for me, I put 1.7 billion entries into Kafka on a map reduce job in 5 and a half hours.
I was concerned using spark 1.5.2 because I'm currently putting my data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars built for spark 1.2 on CDH 5.3. But after debugging quite a bit yesterday, I tried building against 1.5.2. So far it's running without issue on a Spark 1.5.2 cluster. I'm not sure there was too much improvement using the same code, but I'll see how the direct api handles it. In the end I can reduce the number of partitions in Kafka if it causes big performance issues. On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <c...@koeninger.org> wrote: > print() isn't really the best way to benchmark things, since it calls > take(10) under the covers, but 380 records / second for a single > receiver doesn't sound right in any case. > > Am I understanding correctly that you're trying to process a large > number of already-existing kafka messages, not keep up with an > incoming stream? Can you give any details (e.g. hardware, number of > topicpartitions, etc)? > > Really though, I'd try to start with spark 1.6 and direct streams, or > even just kafkacat, as a baseline. > > > > On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams <disc...@uw.edu> wrote: >> Hello again. I searched for "backport kafka" in the list archives but >> couldn't find anything but a post from Spark 0.7.2 . I was going to >> use accumulators to make a counter, but then saw on the Streaming tab >> the Receiver Statistics. Then I removed all other "functionality" >> except: >> >> >> JavaPairReceiverInputDStream<byte[], byte[]> dstream = KafkaUtils >> //createStream(JavaStreamingContext jssc,Class<K> >> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass, >> Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams, >> java.util.Map<String,Integer> topics, StorageLevel storageLevel) >> .createStream(jssc, byte[].class, byte[].class, >> kafka.serializer.DefaultDecoder.class, >> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap, >> StorageLevel.MEMORY_AND_DISK_SER()); >> >> dstream.print(); >> >> Then in the Recieiver Stats for the single receiver, I'm seeing around >> 380 records / second. Then to get anywhere near my 10% mentioned >> above, I'd need to run around 21 receivers, assuming 380 records / >> second, just using the print output. This seems awfully high to me, >> considering that I wrote 80000+ records a second to Kafka from a >> mapreduce job, and that my bottleneck was likely Hbase. Again using >> the 380 estimate, I would need 200+ receivers to reach a similar >> amount of reads. >> >> Even given the issues with the 1.2 receivers, is this the expected way >> to use the Kafka streaming API, or am I doing something terribly >> wrong? >> >> My application looks like >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >> >> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <c...@koeninger.org> wrote: >>> Have you tested for read throughput (without writing to hbase, just >>> deserialize)? >>> >>> Are you limited to using spark 1.2, or is upgrading possible? The >>> kafka direct stream is available starting with 1.3. If you're stuck >>> on 1.2, I believe there have been some attempts to backport it, search >>> the mailing list archives. >>> >>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <disc...@uw.edu> >>> wrote: >>>> I've written an application to get content from a kafka topic with 1.7 >>>> billion entries, get the protobuf serialized entries, and insert into >>>> hbase. Currently the environment that I'm running in is Spark 1.2. >>>> >>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between >>>> 0-2500 writes / second. This will take much too long to consume the >>>> entries. >>>> >>>> I currently believe that the spark kafka receiver is the bottleneck. >>>> I've tried both 1.2 receivers, with the WAL and without, and didn't >>>> notice any large performance difference. I've tried many different >>>> spark configuration options, but can't seem to get better performance. >>>> >>>> I saw 80000 requests / second inserting these records into kafka using >>>> yarn / hbase / protobuf / kafka in a bulk fashion. >>>> >>>> While hbase inserts might not deliver the same throughput, I'd like to >>>> at least get 10%. >>>> >>>> My application looks like >>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>> >>>> This is my first spark application. I'd appreciate any assistance. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org