Hi George, How is the incoming traffic to your source topic? Is it more than 130K?
Guozhang On Thu, May 21, 2015 at 11:44 AM, George Li <g...@ca.ibm.com> wrote: > Hi Roger, > > These parameters dont seem to affect throughput much, probably because my > test job just reads from kafka and doesnt write to it? > > Thanks, > > George > > > > From: Roger Hoover <roger.hoo...@gmail.com> > To: "dev@samza.apache.org" <dev@samza.apache.org>, > Date: 21/05/2015 12:04 PM > Subject: Re: Samza job throughput much lower than Kafka throughput > > > > Oops. Sent too soon. I mean: > > producer.batch.size=262144 > producer.linger.ms=5 > producer.compression.type=lz4 > > > On Thu, May 21, 2015 at 9:00 AM, Roger Hoover <roger.hoo...@gmail.com> > wrote: > > > Hi George, > > > > You might also try tweaking the producer settings. > > > > producer.batch.size=262144 > > producer.linger.ms=5 > > producer.compression.type: lz4 > > > > On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Hi George, > >> > >> Is there any reason you need to set the following configs? > >> > >> systems.kafka.consumer.fetch.wait.max.ms= 1 > >> > >> This setting will basically disable long pooling of the consumer which > >> will > >> then busy fetching data from broker, which has a large impact on > network > >> latency especially when the consumer is already caught up with the > Kafka > >> broker. > >> > >> Also when you say it is "slower than a program reading directly from > >> Kafka." which consumer did your program use to read data from Kafka? > >> > >> Guozhang > >> > >> > >> On Wed, May 20, 2015 at 5:01 PM, George Li <g...@ca.ibm.com> wrote: > >> > >> > Hi Yi, > >> > > >> > Thanks for the reply. Below is my job config and code. > >> > > >> > When we run this job inside our dev docker container, which has > >> zookeeper, > >> > broker, and yarn installed locally, its throughput is at least 50% > >> higher > >> > than our cluster run's. > >> > > >> > Thanks, > >> > > >> > George > >> > > >> > Configuration: > >> > > >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > >> > job.name=container-performance > >> > > >> > # YARN > >> > yarn.container.count=1 > >> > yarn.container.memory.mb=2548 > >> > yarn.package.path={my package on hdfs} > >> > yarn.container.retry.count=0 > >> > yarn.am.container.memory.mb=2048 > >> > yarn.am.jmx.enabled=false > >> > > >> > # Task > >> > task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC > >> > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark > >> > -XX:+DisableExplicitGC -Djava.awt.headless=true > >> > > >> > task.class=samza.TestPerformanceTask > >> > task.inputs=kafka.throughput-test2 > >> > task.log.interval=1000000 > >> > task.checkpoint.factory = > >> > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > >> > task.checkpoint.system=kafka > >> > task.checkpoint.replication.factor=1 > >> > > >> > # Kafka System (only used for coordinator stream in this test) > >> > > >> > > >> > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > >> > systems.kafka.samza.fetch.threshold=50000 > >> > > >> > systems.kafka.consumer.zookeeper.connect= {zookeeper} > >> > systems.kafka.producer.bootstrap.servers={broker node} > >> > systems.kafka.consumer.auto.offset.reset=smallest > >> > systems.kafka.consumer.socket.receive.buffer.bytes= 2200000 > >> > systems.kafka.consumer.fetch.message.max.bytes= 1100000 > >> > systems.kafka.consumer.fetch.min.bytes= 1 > >> > systems.kafka.consumer.fetch.wait.max.ms= 1 > >> > > >> > #define coordinator system > >> > job.coordinator.system=kafka > >> > job.coordinator.replication.factor=1 > >> > > >> > systems.kafka.streams.throughput-test2.samza.reset.offset=true > >> > systems.kafka.streams.throughput-test2.samza.offset.default=oldest > >> > ~ > >> > > >> > Job's code. This is mostly a copy-paste of the one in the repository > >> > > >> > object TestPerformanceTask { > >> > // No thread safety is needed for these variables because they're > >> > mutated in > >> > // // the process method, which is single threaded. > >> > var messagesProcessed = 0 > >> > var startTime = 0L > >> > } > >> > > >> > class TestPerformanceTask extends StreamTask with InitableTask with > >> > Logging { > >> > import TestPerformanceTask._ > >> > > >> > /** > >> > * * How many messages to process before a log message is > printed. > >> > * */ > >> > var logInterval = 10000 > >> > > >> > /** > >> > * * How many messages to process before shutting down. > >> > * */ > >> > var maxMessages = 10000000 > >> > > >> > > >> > var outputSystemStream: Option[SystemStream] = None > >> > > >> > def init(config: Config, context: TaskContext) { > >> > logInterval = config.getInt("task.log.interval", 10000) > >> > maxMessages = config.getInt("task.max.messages", 10000000) > >> > outputSystemStream = Option(config.get("task.outputs", > >> > null)).map(Util.getSystemStreamFromNames(_)) > >> > println("init!!") > >> > } > >> > > >> > def process(envelope: IncomingMessageEnvelope, collector: > >> > MessageCollector, coordinator: TaskCoordinator) { > >> > if (startTime == 0) { > >> > startTime = System.currentTimeMillis > >> > } > >> > > >> > if (outputSystemStream.isDefined) { > >> > collector.send(new > OutgoingMessageEnvelope(outputSystemStream.get, > >> > envelope.getKey, envelope.getMessage)) > >> > } > >> > > >> > messagesProcessed += 1 > >> > > >> > if (messagesProcessed % logInterval == 0) { > >> > val seconds = (System.currentTimeMillis - startTime) / 1000 > >> > println("Processed %s messages in %s seconds." format > >> > (messagesProcessed, seconds)) > >> > } > >> > > >> > > >> > if (messagesProcessed >= maxMessages) { > >> > coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER) > >> > } > >> > } > >> > } > >> > > >> > > >> > > >> > From: Yi Pan <nickpa...@gmail.com> > >> > To: dev@samza.apache.org, > >> > Date: 20/05/2015 05:03 PM > >> > Subject: Re: Samza job throughput much lower than Kafka > >> throughput > >> > > >> > > >> > > >> > Hi, George, > >> > > >> > Could you share w/ us the code and configuration of your sample test > >> job? > >> > Thanks! > >> > > >> > -Yi > >> > > >> > On Wed, May 20, 2015 at 1:19 PM, George Li <g...@ca.ibm.com> wrote: > >> > > >> > > Hi, > >> > > > >> > > We are evaluating Samza's performance, and our sample job with > >> > > TestPerformanceTask is much slower than a program reading directly > >> from > >> > > Kafka. > >> > > > >> > > Scenario: > >> > > > >> > > * Cluster: > >> > > 1 master node for Zookeeper and yarn. > >> > > 3 Kafka broker nodes > >> > > 3 yarn worker nodes > >> > > > >> > > * Kafka: > >> > > Topic has only 1 partition. Average message size is around 100 > byte. > >> > > On a yarn worker node, run the performance test program from Kafka > >> > > repository to read the topic. The throughput is about 400k > >> messages/sec > >> > > > >> > > *Samza > >> > > Run TestPerformanceTask from Samza repository with no output stream > >> > > defined, and the throughput no more than 130k messages/sec > >> > > > >> > > > >> > > How can I explain/fix this performance difference? > >> > > > >> > > What I have done so far: > >> > > > >> > > 1. Monitor yarn worker node resource usage. > >> > > When the job is running, cpu and memory usage are never more than > 5% > >> > > except at the beginning of the run. No significant network and disk > IO > >> > > either > >> > > > >> > > 2. Monitor worker node network traffic > >> > > Tcpdump shows an interesting pattern. The yarn worker node will > fetch > >> a > >> > > block of data from the kafka broker, and after that, it will > handshake > >> > > with the same kafka broker once every 100 ms for 300 ms before > >> fetching > >> > > the next block. > >> > > > >> > > If I increase systems.kafka.samza.fetch.threshold to 500k, i.e., > 10x > >> the > >> > > default settings, this handshake lasts about 3 seconds. If I set > >> > > fetch.threshold to 250k, this idle period then becomes 1.5 sec. It > >> seems > >> > > kafka consumer greatly outpaced process() call > >> > > > >> > > 3. Check Samza metrics > >> > > I do not see any excessive network calls to master or kafka broker, > >> > i.e., > >> > > no more than 30 calls/sec. However, process-ms and choose-ms are > >> > > approaching 2ms > >> > > > >> > > Any input would be greatly appreciated. > >> > > > >> > > Thanks, > >> > > > >> > > George > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > >> > > >> > >> > >> -- > >> -- Guozhang > >> > > > > > > -- -- Guozhang