Hi Yi,

Thanks for the reply. Below is my job config and code.

When we run this job inside our dev docker container, which has zookeeper, 
broker, and yarn installed locally,  its throughput is at least 50% higher 
than our cluster run's. 

Thanks,

George

Configuration:

job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=container-performance

# YARN
yarn.container.count=1
yarn.container.memory.mb=2548
yarn.package.path={my package on hdfs}
yarn.container.retry.count=0
yarn.am.container.memory.mb=2048
yarn.am.jmx.enabled=false

# Task
task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC 
-XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark 
-XX:+DisableExplicitGC -Djava.awt.headless=true

task.class=samza.TestPerformanceTask
task.inputs=kafka.throughput-test2
task.log.interval=1000000
task.checkpoint.factory = 
org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.checkpoint.replication.factor=1

# Kafka System (only used for coordinator stream in this test)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.fetch.threshold=50000

systems.kafka.consumer.zookeeper.connect= {zookeeper}
systems.kafka.producer.bootstrap.servers={broker node}
systems.kafka.consumer.auto.offset.reset=smallest
systems.kafka.consumer.socket.receive.buffer.bytes= 2200000
systems.kafka.consumer.fetch.message.max.bytes= 1100000
systems.kafka.consumer.fetch.min.bytes= 1
systems.kafka.consumer.fetch.wait.max.ms= 1

#define coordinator system
job.coordinator.system=kafka
job.coordinator.replication.factor=1

systems.kafka.streams.throughput-test2.samza.reset.offset=true
systems.kafka.streams.throughput-test2.samza.offset.default=oldest
~

Job's code. This is mostly a copy-paste of the one in the repository

object TestPerformanceTask {
  // No thread safety is needed for these variables because they're 
mutated in
  //   // the process method, which is single threaded.
  var messagesProcessed = 0
  var startTime = 0L
}

class TestPerformanceTask extends StreamTask with InitableTask with 
Logging {
  import TestPerformanceTask._

  /**
   *    * How many messages to process before a log message is printed.
   *       */
  var logInterval = 10000

  /**
   *    * How many messages to process before shutting down.
   *       */
  var maxMessages = 10000000


  var outputSystemStream: Option[SystemStream] = None

  def init(config: Config, context: TaskContext) {
    logInterval = config.getInt("task.log.interval", 10000)
    maxMessages = config.getInt("task.max.messages", 10000000)
    outputSystemStream = Option(config.get("task.outputs", 
null)).map(Util.getSystemStreamFromNames(_))
    println("init!!")
  }

  def process(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator) {
    if (startTime == 0) {
      startTime = System.currentTimeMillis
    }

    if (outputSystemStream.isDefined) {
      collector.send(new OutgoingMessageEnvelope(outputSystemStream.get, 
envelope.getKey, envelope.getMessage))
    }

    messagesProcessed += 1

    if (messagesProcessed % logInterval == 0) {
      val seconds = (System.currentTimeMillis - startTime) / 1000
      println("Processed %s messages in %s seconds." format 
(messagesProcessed, seconds))
    }


    if (messagesProcessed >= maxMessages) {
      coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
    }
  }
}



From:   Yi Pan <nickpa...@gmail.com>
To:     dev@samza.apache.org, 
Date:   20/05/2015 05:03 PM
Subject:        Re: Samza job throughput much lower than Kafka throughput



Hi, George,

Could you share w/ us the code and configuration of your sample test job?
Thanks!

-Yi

On Wed, May 20, 2015 at 1:19 PM, George Li <g...@ca.ibm.com> wrote:

> Hi,
>
> We are evaluating Samza's performance, and our sample job with
> TestPerformanceTask is much slower than a program reading directly from
> Kafka.
>
> Scenario:
>
> * Cluster:
> 1 master node for Zookeeper and yarn.
> 3 Kafka broker nodes
> 3 yarn worker nodes
>
> * Kafka:
> Topic has only 1 partition. Average message size is around 100 byte.
> On a yarn worker node, run the performance test program from Kafka
> repository to read the topic. The throughput is about 400k messages/sec
>
> *Samza
> Run TestPerformanceTask from Samza repository with no output stream
> defined, and the throughput no more than 130k messages/sec
>
>
> How can I explain/fix this performance difference?
>
> What I have done so far:
>
> 1. Monitor yarn worker node resource usage.
> When the job is running, cpu and memory usage are never more than 5%
> except at the beginning of the run. No significant network and disk IO
> either
>
> 2. Monitor worker node network traffic
> Tcpdump shows an interesting pattern. The yarn worker node will fetch a
> block of data from the kafka broker, and after that, it will handshake
> with the same kafka broker once every 100 ms for 300 ms before fetching
> the next block.
>
> If I increase systems.kafka.samza.fetch.threshold to 500k, i.e., 10x the
> default settings, this handshake lasts about 3 seconds. If I set
> fetch.threshold to 250k, this idle period then becomes 1.5 sec. It seems
> kafka consumer greatly outpaced process() call
>
> 3. Check Samza metrics
> I do not see any excessive network calls to master or kafka broker, 
i.e.,
> no more than 30 calls/sec. However, process-ms and choose-ms are
> approaching 2ms
>
> Any input would be greatly appreciated.
>
> Thanks,
>
> George
>
>
>
>
>

Reply via email to