Hi Roger, These parameters dont seem to affect throughput much, probably because my test job just reads from kafka and doesnt write to it?
Thanks, George From: Roger Hoover <roger.hoo...@gmail.com> To: "dev@samza.apache.org" <dev@samza.apache.org>, Date: 21/05/2015 12:04 PM Subject: Re: Samza job throughput much lower than Kafka throughput Oops. Sent too soon. I mean: producer.batch.size=262144 producer.linger.ms=5 producer.compression.type=lz4 On Thu, May 21, 2015 at 9:00 AM, Roger Hoover <roger.hoo...@gmail.com> wrote: > Hi George, > > You might also try tweaking the producer settings. > > producer.batch.size=262144 > producer.linger.ms=5 > producer.compression.type: lz4 > > On Wed, May 20, 2015 at 9:30 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi George, >> >> Is there any reason you need to set the following configs? >> >> systems.kafka.consumer.fetch.wait.max.ms= 1 >> >> This setting will basically disable long pooling of the consumer which >> will >> then busy fetching data from broker, which has a large impact on network >> latency especially when the consumer is already caught up with the Kafka >> broker. >> >> Also when you say it is "slower than a program reading directly from >> Kafka." which consumer did your program use to read data from Kafka? >> >> Guozhang >> >> >> On Wed, May 20, 2015 at 5:01 PM, George Li <g...@ca.ibm.com> wrote: >> >> > Hi Yi, >> > >> > Thanks for the reply. Below is my job config and code. >> > >> > When we run this job inside our dev docker container, which has >> zookeeper, >> > broker, and yarn installed locally, its throughput is at least 50% >> higher >> > than our cluster run's. >> > >> > Thanks, >> > >> > George >> > >> > Configuration: >> > >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >> > job.name=container-performance >> > >> > # YARN >> > yarn.container.count=1 >> > yarn.container.memory.mb=2548 >> > yarn.package.path={my package on hdfs} >> > yarn.container.retry.count=0 >> > yarn.am.container.memory.mb=2048 >> > yarn.am.jmx.enabled=false >> > >> > # Task >> > task.opts=-server -Xmx1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC >> > -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark >> > -XX:+DisableExplicitGC -Djava.awt.headless=true >> > >> > task.class=samza.TestPerformanceTask >> > task.inputs=kafka.throughput-test2 >> > task.log.interval=1000000 >> > task.checkpoint.factory = >> > org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory >> > task.checkpoint.system=kafka >> > task.checkpoint.replication.factor=1 >> > >> > # Kafka System (only used for coordinator stream in this test) >> > >> > >> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >> > systems.kafka.samza.fetch.threshold=50000 >> > >> > systems.kafka.consumer.zookeeper.connect= {zookeeper} >> > systems.kafka.producer.bootstrap.servers={broker node} >> > systems.kafka.consumer.auto.offset.reset=smallest >> > systems.kafka.consumer.socket.receive.buffer.bytes= 2200000 >> > systems.kafka.consumer.fetch.message.max.bytes= 1100000 >> > systems.kafka.consumer.fetch.min.bytes= 1 >> > systems.kafka.consumer.fetch.wait.max.ms= 1 >> > >> > #define coordinator system >> > job.coordinator.system=kafka >> > job.coordinator.replication.factor=1 >> > >> > systems.kafka.streams.throughput-test2.samza.reset.offset=true >> > systems.kafka.streams.throughput-test2.samza.offset.default=oldest >> > ~ >> > >> > Job's code. This is mostly a copy-paste of the one in the repository >> > >> > object TestPerformanceTask { >> > // No thread safety is needed for these variables because they're >> > mutated in >> > // // the process method, which is single threaded. >> > var messagesProcessed = 0 >> > var startTime = 0L >> > } >> > >> > class TestPerformanceTask extends StreamTask with InitableTask with >> > Logging { >> > import TestPerformanceTask._ >> > >> > /** >> > * * How many messages to process before a log message is printed. >> > * */ >> > var logInterval = 10000 >> > >> > /** >> > * * How many messages to process before shutting down. >> > * */ >> > var maxMessages = 10000000 >> > >> > >> > var outputSystemStream: Option[SystemStream] = None >> > >> > def init(config: Config, context: TaskContext) { >> > logInterval = config.getInt("task.log.interval", 10000) >> > maxMessages = config.getInt("task.max.messages", 10000000) >> > outputSystemStream = Option(config.get("task.outputs", >> > null)).map(Util.getSystemStreamFromNames(_)) >> > println("init!!") >> > } >> > >> > def process(envelope: IncomingMessageEnvelope, collector: >> > MessageCollector, coordinator: TaskCoordinator) { >> > if (startTime == 0) { >> > startTime = System.currentTimeMillis >> > } >> > >> > if (outputSystemStream.isDefined) { >> > collector.send(new OutgoingMessageEnvelope(outputSystemStream.get, >> > envelope.getKey, envelope.getMessage)) >> > } >> > >> > messagesProcessed += 1 >> > >> > if (messagesProcessed % logInterval == 0) { >> > val seconds = (System.currentTimeMillis - startTime) / 1000 >> > println("Processed %s messages in %s seconds." format >> > (messagesProcessed, seconds)) >> > } >> > >> > >> > if (messagesProcessed >= maxMessages) { >> > coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER) >> > } >> > } >> > } >> > >> > >> > >> > From: Yi Pan <nickpa...@gmail.com> >> > To: dev@samza.apache.org, >> > Date: 20/05/2015 05:03 PM >> > Subject: Re: Samza job throughput much lower than Kafka >> throughput >> > >> > >> > >> > Hi, George, >> > >> > Could you share w/ us the code and configuration of your sample test >> job? >> > Thanks! >> > >> > -Yi >> > >> > On Wed, May 20, 2015 at 1:19 PM, George Li <g...@ca.ibm.com> wrote: >> > >> > > Hi, >> > > >> > > We are evaluating Samza's performance, and our sample job with >> > > TestPerformanceTask is much slower than a program reading directly >> from >> > > Kafka. >> > > >> > > Scenario: >> > > >> > > * Cluster: >> > > 1 master node for Zookeeper and yarn. >> > > 3 Kafka broker nodes >> > > 3 yarn worker nodes >> > > >> > > * Kafka: >> > > Topic has only 1 partition. Average message size is around 100 byte. >> > > On a yarn worker node, run the performance test program from Kafka >> > > repository to read the topic. The throughput is about 400k >> messages/sec >> > > >> > > *Samza >> > > Run TestPerformanceTask from Samza repository with no output stream >> > > defined, and the throughput no more than 130k messages/sec >> > > >> > > >> > > How can I explain/fix this performance difference? >> > > >> > > What I have done so far: >> > > >> > > 1. Monitor yarn worker node resource usage. >> > > When the job is running, cpu and memory usage are never more than 5% >> > > except at the beginning of the run. No significant network and disk IO >> > > either >> > > >> > > 2. Monitor worker node network traffic >> > > Tcpdump shows an interesting pattern. The yarn worker node will fetch >> a >> > > block of data from the kafka broker, and after that, it will handshake >> > > with the same kafka broker once every 100 ms for 300 ms before >> fetching >> > > the next block. >> > > >> > > If I increase systems.kafka.samza.fetch.threshold to 500k, i.e., 10x >> the >> > > default settings, this handshake lasts about 3 seconds. If I set >> > > fetch.threshold to 250k, this idle period then becomes 1.5 sec. It >> seems >> > > kafka consumer greatly outpaced process() call >> > > >> > > 3. Check Samza metrics >> > > I do not see any excessive network calls to master or kafka broker, >> > i.e., >> > > no more than 30 calls/sec. However, process-ms and choose-ms are >> > > approaching 2ms >> > > >> > > Any input would be greatly appreciated. >> > > >> > > Thanks, >> > > >> > > George >> > > >> > > >> > > >> > > >> > > >> > >> > >> >> >> -- >> -- Guozhang >> > >