Hi George, You might also try tweaking the producer settings.
producer.batch.size=262144 producer.linger.ms=5 producer.compression.type: lz4 On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi George, > > Is there any reason you need to set the following configs? > > systems.kafka.consumer.fetch.wait.max.ms= 1 > > This setting will basically disable long pooling of the consumer which will > then busy fetching data from broker, which has a large impact on network > latency especially when the consumer is already caught up with the Kafka > broker. > > Also when you say it is "slower than a program reading directly from > Kafka." which consumer did your program use to read data from Kafka? > > Guozhang > > > On Wed, May 20, 2015 at 5:01 PM, George Li <g...@ca.ibm.com> wrote: > > > Hi Yi, > > > > Thanks for the reply. Below is my job config and code. > > > > When we run this job inside our dev docker container, which has > zookeeper, > > broker, and yarn installed locally, its throughput is at least 50% > higher > > than our cluster run's. > > > > Thanks, > > > > George > > > > Configuration: > > > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > job.name=container-performance > > > > # YARN > > yarn.container.count=1 > > yarn.container.memory.mb=2548 > > yarn.package.path={my package on hdfs} > > yarn.container.retry.count=0 > > yarn.am.container.memory.mb=2048 > > yarn.am.jmx.enabled=false > > > > # Task > > task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC > > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark > > -XX:+DisableExplicitGC -Djava.awt.headless=true > > > > task.class=samza.TestPerformanceTask > > task.inputs=kafka.throughput-test2 > > task.log.interval=1000000 > > task.checkpoint.factory = > > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > task.checkpoint.system=kafka > > task.checkpoint.replication.factor=1 > > > > # Kafka System (only used for coordinator stream in this test) > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > systems.kafka.samza.fetch.threshold=50000 > > > > systems.kafka.consumer.zookeeper.connect= {zookeeper} > > systems.kafka.producer.bootstrap.servers={broker node} > > systems.kafka.consumer.auto.offset.reset=smallest > > systems.kafka.consumer.socket.receive.buffer.bytes= 2200000 > > systems.kafka.consumer.fetch.message.max.bytes= 1100000 > > systems.kafka.consumer.fetch.min.bytes= 1 > > systems.kafka.consumer.fetch.wait.max.ms= 1 > > > > #define coordinator system > > job.coordinator.system=kafka > > job.coordinator.replication.factor=1 > > > > systems.kafka.streams.throughput-test2.samza.reset.offset=true > > systems.kafka.streams.throughput-test2.samza.offset.default=oldest > > ~ > > > > Job's code. This is mostly a copy-paste of the one in the repository > > > > object TestPerformanceTask { > > // No thread safety is needed for these variables because they're > > mutated in > > // // the process method, which is single threaded. > > var messagesProcessed = 0 > > var startTime = 0L > > } > > > > class TestPerformanceTask extends StreamTask with InitableTask with > > Logging { > > import TestPerformanceTask._ > > > > /** > > * * How many messages to process before a log message is printed. > > * */ > > var logInterval = 10000 > > > > /** > > * * How many messages to process before shutting down. > > * */ > > var maxMessages = 10000000 > > > > > > var outputSystemStream: Option[SystemStream] = None > > > > def init(config: Config, context: TaskContext) { > > logInterval = config.getInt("task.log.interval", 10000) > > maxMessages = config.getInt("task.max.messages", 10000000) > > outputSystemStream = Option(config.get("task.outputs", > > null)).map(Util.getSystemStreamFromNames(_)) > > println("init!!") > > } > > > > def process(envelope: IncomingMessageEnvelope, collector: > > MessageCollector, coordinator: TaskCoordinator) { > > if (startTime == 0) { > > startTime = System.currentTimeMillis > > } > > > > if (outputSystemStream.isDefined) { > > collector.send(new OutgoingMessageEnvelope(outputSystemStream.get, > > envelope.getKey, envelope.getMessage)) > > } > > > > messagesProcessed += 1 > > > > if (messagesProcessed % logInterval == 0) { > > val seconds = (System.currentTimeMillis - startTime) / 1000 > > println("Processed %s messages in %s seconds." format > > (messagesProcessed, seconds)) > > } > > > > > > if (messagesProcessed >= maxMessages) { > > coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER) > > } > > } > > } > > > > > > > > From: Yi Pan <nickpa...@gmail.com> > > To: dev@samza.apache.org, > > Date: 20/05/2015 05:03 PM > > Subject: Re: Samza job throughput much lower than Kafka throughput > > > > > > > > Hi, George, > > > > Could you share w/ us the code and configuration of your sample test job? > > Thanks! > > > > -Yi > > > > On Wed, May 20, 2015 at 1:19 PM, George Li <g...@ca.ibm.com> wrote: > > > > > Hi, > > > > > > We are evaluating Samza's performance, and our sample job with > > > TestPerformanceTask is much slower than a program reading directly from > > > Kafka. > > > > > > Scenario: > > > > > > * Cluster: > > > 1 master node for Zookeeper and yarn. > > > 3 Kafka broker nodes > > > 3 yarn worker nodes > > > > > > * Kafka: > > > Topic has only 1 partition. Average message size is around 100 byte. > > > On a yarn worker node, run the performance test program from Kafka > > > repository to read the topic. The throughput is about 400k messages/sec > > > > > > *Samza > > > Run TestPerformanceTask from Samza repository with no output stream > > > defined, and the throughput no more than 130k messages/sec > > > > > > > > > How can I explain/fix this performance difference? > > > > > > What I have done so far: > > > > > > 1. Monitor yarn worker node resource usage. > > > When the job is running, cpu and memory usage are never more than 5% > > > except at the beginning of the run. No significant network and disk IO > > > either > > > > > > 2. Monitor worker node network traffic > > > Tcpdump shows an interesting pattern. The yarn worker node will fetch a > > > block of data from the kafka broker, and after that, it will handshake > > > with the same kafka broker once every 100 ms for 300 ms before fetching > > > the next block. > > > > > > If I increase systems.kafka.samza.fetch.threshold to 500k, i.e., 10x > the > > > default settings, this handshake lasts about 3 seconds. If I set > > > fetch.threshold to 250k, this idle period then becomes 1.5 sec. It > seems > > > kafka consumer greatly outpaced process() call > > > > > > 3. Check Samza metrics > > > I do not see any excessive network calls to master or kafka broker, > > i.e., > > > no more than 30 calls/sec. However, process-ms and choose-ms are > > > approaching 2ms > > > > > > Any input would be greatly appreciated. > > > > > > Thanks, > > > > > > George > > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >