Hi George,

You might also try tweaking the producer settings.

        producer.batch.size=262144
        producer.linger.ms=5
producer.compression.type: lz4

On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi George,
>
> Is there any reason you need to set the following configs?
>
> systems.kafka.consumer.fetch.wait.max.ms= 1
>
> This setting will basically disable long pooling of the consumer which will
> then busy fetching data from broker, which has a large impact on network
> latency especially when the consumer is already caught up with the Kafka
> broker.
>
> Also when you say it is "slower than a program reading directly from
> Kafka." which consumer did your program use to read data from Kafka?
>
> Guozhang
>
>
> On Wed, May 20, 2015 at 5:01 PM, George Li <g...@ca.ibm.com> wrote:
>
> > Hi Yi,
> >
> > Thanks for the reply. Below is my job config and code.
> >
> > When we run this job inside our dev docker container, which has
> zookeeper,
> > broker, and yarn installed locally,  its throughput is at least 50%
> higher
> > than our cluster run's.
> >
> > Thanks,
> >
> > George
> >
> > Configuration:
> >
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > job.name=container-performance
> >
> > # YARN
> > yarn.container.count=1
> > yarn.container.memory.mb=2548
> > yarn.package.path={my package on hdfs}
> > yarn.container.retry.count=0
> > yarn.am.container.memory.mb=2048
> > yarn.am.jmx.enabled=false
> >
> > # Task
> > task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
> > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
> > -XX:+DisableExplicitGC -Djava.awt.headless=true
> >
> > task.class=samza.TestPerformanceTask
> > task.inputs=kafka.throughput-test2
> > task.log.interval=1000000
> > task.checkpoint.factory =
> > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> > task.checkpoint.system=kafka
> > task.checkpoint.replication.factor=1
> >
> > # Kafka System (only used for coordinator stream in this test)
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > systems.kafka.samza.fetch.threshold=50000
> >
> > systems.kafka.consumer.zookeeper.connect= {zookeeper}
> > systems.kafka.producer.bootstrap.servers={broker node}
> > systems.kafka.consumer.auto.offset.reset=smallest
> > systems.kafka.consumer.socket.receive.buffer.bytes= 2200000
> > systems.kafka.consumer.fetch.message.max.bytes= 1100000
> > systems.kafka.consumer.fetch.min.bytes= 1
> > systems.kafka.consumer.fetch.wait.max.ms= 1
> >
> > #define coordinator system
> > job.coordinator.system=kafka
> > job.coordinator.replication.factor=1
> >
> > systems.kafka.streams.throughput-test2.samza.reset.offset=true
> > systems.kafka.streams.throughput-test2.samza.offset.default=oldest
> > ~
> >
> > Job's code. This is mostly a copy-paste of the one in the repository
> >
> > object TestPerformanceTask {
> >   // No thread safety is needed for these variables because they're
> > mutated in
> >   //   // the process method, which is single threaded.
> >   var messagesProcessed = 0
> >   var startTime = 0L
> > }
> >
> > class TestPerformanceTask extends StreamTask with InitableTask with
> > Logging {
> >   import TestPerformanceTask._
> >
> >   /**
> >    *    * How many messages to process before a log message is printed.
> >    *       */
> >   var logInterval = 10000
> >
> >   /**
> >    *    * How many messages to process before shutting down.
> >    *       */
> >   var maxMessages = 10000000
> >
> >
> >   var outputSystemStream: Option[SystemStream] = None
> >
> >   def init(config: Config, context: TaskContext) {
> >     logInterval = config.getInt("task.log.interval", 10000)
> >     maxMessages = config.getInt("task.max.messages", 10000000)
> >     outputSystemStream = Option(config.get("task.outputs",
> > null)).map(Util.getSystemStreamFromNames(_))
> >     println("init!!")
> >   }
> >
> >   def process(envelope: IncomingMessageEnvelope, collector:
> > MessageCollector, coordinator: TaskCoordinator) {
> >     if (startTime == 0) {
> >       startTime = System.currentTimeMillis
> >     }
> >
> >     if (outputSystemStream.isDefined) {
> >       collector.send(new OutgoingMessageEnvelope(outputSystemStream.get,
> > envelope.getKey, envelope.getMessage))
> >     }
> >
> >     messagesProcessed += 1
> >
> >     if (messagesProcessed % logInterval == 0) {
> >       val seconds = (System.currentTimeMillis - startTime) / 1000
> >       println("Processed %s messages in %s seconds." format
> > (messagesProcessed, seconds))
> >     }
> >
> >
> >     if (messagesProcessed >= maxMessages) {
> >       coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
> >     }
> >   }
> > }
> >
> >
> >
> > From:   Yi Pan <nickpa...@gmail.com>
> > To:     dev@samza.apache.org,
> > Date:   20/05/2015 05:03 PM
> > Subject:        Re: Samza job throughput much lower than Kafka throughput
> >
> >
> >
> > Hi, George,
> >
> > Could you share w/ us the code and configuration of your sample test job?
> > Thanks!
> >
> > -Yi
> >
> > On Wed, May 20, 2015 at 1:19 PM, George Li <g...@ca.ibm.com> wrote:
> >
> > > Hi,
> > >
> > > We are evaluating Samza's performance, and our sample job with
> > > TestPerformanceTask is much slower than a program reading directly from
> > > Kafka.
> > >
> > > Scenario:
> > >
> > > * Cluster:
> > > 1 master node for Zookeeper and yarn.
> > > 3 Kafka broker nodes
> > > 3 yarn worker nodes
> > >
> > > * Kafka:
> > > Topic has only 1 partition. Average message size is around 100 byte.
> > > On a yarn worker node, run the performance test program from Kafka
> > > repository to read the topic. The throughput is about 400k messages/sec
> > >
> > > *Samza
> > > Run TestPerformanceTask from Samza repository with no output stream
> > > defined, and the throughput no more than 130k messages/sec
> > >
> > >
> > > How can I explain/fix this performance difference?
> > >
> > > What I have done so far:
> > >
> > > 1. Monitor yarn worker node resource usage.
> > > When the job is running, cpu and memory usage are never more than 5%
> > > except at the beginning of the run. No significant network and disk IO
> > > either
> > >
> > > 2. Monitor worker node network traffic
> > > Tcpdump shows an interesting pattern. The yarn worker node will fetch a
> > > block of data from the kafka broker, and after that, it will handshake
> > > with the same kafka broker once every 100 ms for 300 ms before fetching
> > > the next block.
> > >
> > > If I increase systems.kafka.samza.fetch.threshold to 500k, i.e., 10x
> the
> > > default settings, this handshake lasts about 3 seconds. If I set
> > > fetch.threshold to 250k, this idle period then becomes 1.5 sec. It
> seems
> > > kafka consumer greatly outpaced process() call
> > >
> > > 3. Check Samza metrics
> > > I do not see any excessive network calls to master or kafka broker,
> > i.e.,
> > > no more than 30 calls/sec. However, process-ms and choose-ms are
> > > approaching 2ms
> > >
> > > Any input would be greatly appreciated.
> > >
> > > Thanks,
> > >
> > > George
> > >
> > >
> > >
> > >
> > >
> >
> >
>
>
> --
> -- Guozhang
>

Reply via email to