Hi George,

How is the incoming traffic to your source topic? Is it more than 130K?
Guozhang


On Thu, May 21, 2015 at 11:44 AM, George Li <g...@ca.ibm.com> wrote:

> Hi Roger,
>
> These parameters dont seem to affect throughput much, probably because my
> test job just reads from kafka and doesnt write to it?
>
> Thanks,
>
> George
>
>
>
> From:   Roger Hoover <roger.hoo...@gmail.com>
> To:     "dev@samza.apache.org" <dev@samza.apache.org>,
> Date:   21/05/2015 12:04 PM
> Subject:        Re: Samza job throughput much lower than Kafka throughput
>
>
>
> Oops.  Sent too soon.  I mean:
>
> producer.batch.size=262144
> producer.linger.ms=5
> producer.compression.type=lz4
>
>
> On Thu, May 21, 2015 at 9:00 AM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > Hi George,
> >
> > You might also try tweaking the producer settings.
> >
> >         producer.batch.size=262144
> >         producer.linger.ms=5
> > producer.compression.type: lz4
> >
> > On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hi George,
> >>
> >> Is there any reason you need to set the following configs?
> >>
> >> systems.kafka.consumer.fetch.wait.max.ms= 1
> >>
> >> This setting will basically disable long pooling of the consumer which
> >> will
> >> then busy fetching data from broker, which has a large impact on
> network
> >> latency especially when the consumer is already caught up with the
> Kafka
> >> broker.
> >>
> >> Also when you say it is "slower than a program reading directly from
> >> Kafka." which consumer did your program use to read data from Kafka?
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, May 20, 2015 at 5:01 PM, George Li <g...@ca.ibm.com> wrote:
> >>
> >> > Hi Yi,
> >> >
> >> > Thanks for the reply. Below is my job config and code.
> >> >
> >> > When we run this job inside our dev docker container, which has
> >> zookeeper,
> >> > broker, and yarn installed locally,  its throughput is at least 50%
> >> higher
> >> > than our cluster run's.
> >> >
> >> > Thanks,
> >> >
> >> > George
> >> >
> >> > Configuration:
> >> >
> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >> > job.name=container-performance
> >> >
> >> > # YARN
> >> > yarn.container.count=1
> >> > yarn.container.memory.mb=2548
> >> > yarn.package.path={my package on hdfs}
> >> > yarn.container.retry.count=0
> >> > yarn.am.container.memory.mb=2048
> >> > yarn.am.jmx.enabled=false
> >> >
> >> > # Task
> >> > task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
> >> > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
> >> > -XX:+DisableExplicitGC -Djava.awt.headless=true
> >> >
> >> > task.class=samza.TestPerformanceTask
> >> > task.inputs=kafka.throughput-test2
> >> > task.log.interval=1000000
> >> > task.checkpoint.factory =
> >> > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >> > task.checkpoint.system=kafka
> >> > task.checkpoint.replication.factor=1
> >> >
> >> > # Kafka System (only used for coordinator stream in this test)
> >> >
> >> >
> >>
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >> > systems.kafka.samza.fetch.threshold=50000
> >> >
> >> > systems.kafka.consumer.zookeeper.connect= {zookeeper}
> >> > systems.kafka.producer.bootstrap.servers={broker node}
> >> > systems.kafka.consumer.auto.offset.reset=smallest
> >> > systems.kafka.consumer.socket.receive.buffer.bytes= 2200000
> >> > systems.kafka.consumer.fetch.message.max.bytes= 1100000
> >> > systems.kafka.consumer.fetch.min.bytes= 1
> >> > systems.kafka.consumer.fetch.wait.max.ms= 1
> >> >
> >> > #define coordinator system
> >> > job.coordinator.system=kafka
> >> > job.coordinator.replication.factor=1
> >> >
> >> > systems.kafka.streams.throughput-test2.samza.reset.offset=true
> >> > systems.kafka.streams.throughput-test2.samza.offset.default=oldest
> >> > ~
> >> >
> >> > Job's code. This is mostly a copy-paste of the one in the repository
> >> >
> >> > object TestPerformanceTask {
> >> >   // No thread safety is needed for these variables because they're
> >> > mutated in
> >> >   //   // the process method, which is single threaded.
> >> >   var messagesProcessed = 0
> >> >   var startTime = 0L
> >> > }
> >> >
> >> > class TestPerformanceTask extends StreamTask with InitableTask with
> >> > Logging {
> >> >   import TestPerformanceTask._
> >> >
> >> >   /**
> >> >    *    * How many messages to process before a log message is
> printed.
> >> >    *       */
> >> >   var logInterval = 10000
> >> >
> >> >   /**
> >> >    *    * How many messages to process before shutting down.
> >> >    *       */
> >> >   var maxMessages = 10000000
> >> >
> >> >
> >> >   var outputSystemStream: Option[SystemStream] = None
> >> >
> >> >   def init(config: Config, context: TaskContext) {
> >> >     logInterval = config.getInt("task.log.interval", 10000)
> >> >     maxMessages = config.getInt("task.max.messages", 10000000)
> >> >     outputSystemStream = Option(config.get("task.outputs",
> >> > null)).map(Util.getSystemStreamFromNames(_))
> >> >     println("init!!")
> >> >   }
> >> >
> >> >   def process(envelope: IncomingMessageEnvelope, collector:
> >> > MessageCollector, coordinator: TaskCoordinator) {
> >> >     if (startTime == 0) {
> >> >       startTime = System.currentTimeMillis
> >> >     }
> >> >
> >> >     if (outputSystemStream.isDefined) {
> >> >       collector.send(new
> OutgoingMessageEnvelope(outputSystemStream.get,
> >> > envelope.getKey, envelope.getMessage))
> >> >     }
> >> >
> >> >     messagesProcessed += 1
> >> >
> >> >     if (messagesProcessed % logInterval == 0) {
> >> >       val seconds = (System.currentTimeMillis - startTime) / 1000
> >> >       println("Processed %s messages in %s seconds." format
> >> > (messagesProcessed, seconds))
> >> >     }
> >> >
> >> >
> >> >     if (messagesProcessed >= maxMessages) {
> >> >       coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
> >> >     }
> >> >   }
> >> > }
> >> >
> >> >
> >> >
> >> > From:   Yi Pan <nickpa...@gmail.com>
> >> > To:     dev@samza.apache.org,
> >> > Date:   20/05/2015 05:03 PM
> >> > Subject:        Re: Samza job throughput much lower than Kafka
> >> throughput
> >> >
> >> >
> >> >
> >> > Hi, George,
> >> >
> >> > Could you share w/ us the code and configuration of your sample test
> >> job?
> >> > Thanks!
> >> >
> >> > -Yi
> >> >
> >> > On Wed, May 20, 2015 at 1:19 PM, George Li <g...@ca.ibm.com> wrote:
> >> >
> >> > > Hi,
> >> > >
> >> > > We are evaluating Samza's performance, and our sample job with
> >> > > TestPerformanceTask is much slower than a program reading directly
> >> from
> >> > > Kafka.
> >> > >
> >> > > Scenario:
> >> > >
> >> > > * Cluster:
> >> > > 1 master node for Zookeeper and yarn.
> >> > > 3 Kafka broker nodes
> >> > > 3 yarn worker nodes
> >> > >
> >> > > * Kafka:
> >> > > Topic has only 1 partition. Average message size is around 100
> byte.
> >> > > On a yarn worker node, run the performance test program from Kafka
> >> > > repository to read the topic. The throughput is about 400k
> >> messages/sec
> >> > >
> >> > > *Samza
> >> > > Run TestPerformanceTask from Samza repository with no output stream
> >> > > defined, and the throughput no more than 130k messages/sec
> >> > >
> >> > >
> >> > > How can I explain/fix this performance difference?
> >> > >
> >> > > What I have done so far:
> >> > >
> >> > > 1. Monitor yarn worker node resource usage.
> >> > > When the job is running, cpu and memory usage are never more than
> 5%
> >> > > except at the beginning of the run. No significant network and disk
> IO
> >> > > either
> >> > >
> >> > > 2. Monitor worker node network traffic
> >> > > Tcpdump shows an interesting pattern. The yarn worker node will
> fetch
> >> a
> >> > > block of data from the kafka broker, and after that, it will
> handshake
> >> > > with the same kafka broker once every 100 ms for 300 ms before
> >> fetching
> >> > > the next block.
> >> > >
> >> > > If I increase systems.kafka.samza.fetch.threshold to 500k, i.e.,
> 10x
> >> the
> >> > > default settings, this handshake lasts about 3 seconds. If I set
> >> > > fetch.threshold to 250k, this idle period then becomes 1.5 sec. It
> >> seems
> >> > > kafka consumer greatly outpaced process() call
> >> > >
> >> > > 3. Check Samza metrics
> >> > > I do not see any excessive network calls to master or kafka broker,
> >> > i.e.,
> >> > > no more than 30 calls/sec. However, process-ms and choose-ms are
> >> > > approaching 2ms
> >> > >
> >> > > Any input would be greatly appreciated.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > George
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> >
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>
>


-- 
-- Guozhang

Reply via email to