Hi Riyaz,

There are a number of reasons that you may be getting low performance.
 Here are some questions to get started:

1. How big are your messages?  To meet your throughput requirement you need
a minimum of 10K messages per second continuously.  You specified a
replication factor of 3 so at a message length of 500 bytes (for example)
you would need to write a minimum of 15mb/second continuously across both
hosts.  That is a small amount or a large amount depending on your storage
configuration.

2. How did you determine the throughput rate? Is the throughput number
end-to-end including Storm and HBase or do you see the low throughput for
Kafka itself?  In either case can you isolate the rates of ingress and
egress to Kafka?

Assuming the problem is in Kafka here are some more questions.

3. Are you running VMs?  If so what kind and how many CPUs are allocated to
each VM?

4. What kind of storage do you have?  According to your description you
have 11 nodes over two hosts?   At the level you are attempting to reach
anything less than SSDs or very performant RAID may be an issue due to
random I/O. If you have network attached storage this can be a huge
bottleneck.

5. What kind of network cards do you have?

6. What kind of stats do you see on the hosts when your tests are running?

- What is the I/O wait?  Anything above a few percent indicates problems.
 (Top gives good numbers)
- What is the run queue length?  CPU starvation could be a problem
especially if you have VMs.  (Top and uptime give good numbers.)
- How much memory is in the OS page cache?  This has a big impact on I/O
efficiency if you are short of memory.  (free -g gives useful numbers)
- On a related topic are you reading from storage or are your reads served
from memory (iostat should ideally show no reads from storage, only writes,
because all reads are served from the OS page cache.)
- Are you swapping?

7. What is the memory size for your JVMs and are you using Java 7?  Do you
have G1 GC enabled according to current Kafka recommendations?

8. Where is zookeeper running?  It can be a bottleneck at high transaction
rates.

9. How many topics do you have?

10. How many producers do you have and where are they running?

11. How many consumers are you running?  I don't know Storm so it's hard to
tell from the configuration you have listed how many would run or where
they would operate.

It seems possible you need to spread processing across more independent
hosts but that is a guess pending other information.  It is hard to
evaluate your Kafka settings without this.

Best regards, Robert



On Sat, Jun 14, 2014 at 8:14 AM, Shaikh Ahmed <rnsr.sha...@gmail.com> wrote:

> Hi,
>
> Daily we are downloaded 28 Million of messages and Monthly it goes up to
> 800+ million.
>
> We want to process this amount of data through our kafka and storm cluster
> and would like to store in HBase cluster.
>
> We are targeting to process one month of data in one day. Is it possible?
>
> We have setup our cluster thinking that we can process million of messages
> in one sec as mentioned on web. Unfortunately, we have ended-up with
> processing only 1200-1700 message per second.  if we continue with this
> speed than it will take min 10 days to process 30 days of data, which is
> the relevant solution in our case.
>
> I suspect that we have to change some configuration to achieve this goal.
> Looking for help from experts to support me in achieving this task.
>
> *Kafka Cluster:*
> Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of
> storage. We have total 11 nodes kafka cluster spread across these two
> servers.
>
> *Kafka Configuration:*
> producer.type=async
> compression.codec=none
> request.required.acks=-1
> serializer.class=kafka.serializer.StringEncoder
> queue.buffering.max.ms=100000
> batch.num.messages=10000
> queue.buffering.max.messages=100000
> default.replication.factor=3
> controlled.shutdown.enable=true
> auto.leader.rebalance.enable=true
> num.network.threads=2
> num.io.threads=8
> num.partitions=4
> log.retention.hours=12
> log.segment.bytes=536870912
> log.retention.check.interval.ms=60000
> log.cleaner.enable=false
>
> *Storm Cluster:*
> Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB
> of RAM and 8TB of storage. These servers are shared with hbase cluster.
>
> *Kafka spout configuration*
> kafkaConfig.bufferSizeBytes = 1024*1024*8;
> kafkaConfig.fetchSizeBytes = 1024*1024*4;
> kafkaConfig.forceFromStart = true;
>
> *Topology: StormTopology*
> Spout           - Partition: 4
> First Bolt     -  parallelism hint: 6 and Num tasks: 5
> Second Bolt -  parallelism hint: 5
> Third Bolt     -   parallelism hint: 3
> Fourth Bolt   -  parallelism hint: 3 and Num tasks: 4
> Fifth Bolt      -  parallelism hint: 3
> Sixth Bolt     -  parallelism hint: 3
>
> *Supervisor configuration:*
>
> storm.local.dir: "/app/storm"
> storm.zookeeper.port: 2181
> storm.cluster.mode: "distributed"
> storm.local.mode.zmq: false
> supervisor.slots.ports:
>     - 6700
>     - 6701
>     - 6702
>     - 6703
> supervisor.worker.start.timeout.secs: 180
> supervisor.worker.timeout.secs: 30
> supervisor.monitor.frequency.secs: 3
> supervisor.heartbeat.frequency.secs: 5
> supervisor.enable: true
>
> storm.messaging.netty.server_worker_threads: 2
> storm.messaging.netty.client_worker_threads: 2
> storm.messaging.netty.buffer_size: 52428800 #50MB buffer
> storm.messaging.netty.max_retries: 25
> storm.messaging.netty.max_wait_ms: 1000
> storm.messaging.netty.min_wait_ms: 100
>
>
> supervisor.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
> worker.childopts: "-Xmx2048m -Djava.net.preferIPv4Stack=true"
>
>
> Please let me know if more information needed..
>
> Thanks in advance.
>
> Regards,
> Riyaz
>

Reply via email to