Hi Hironori, [1] and [2] describes the process of measuring Kafka performance. I think the perf test code is under org.apache.kafka.tools package in 0.9, so you may have to change commands in [2] to reflect that.
Thanks Milinda [1] https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines [2] https://gist.github.com/jkreps/c7ddb4041ef62a900e6c On Tue, Mar 15, 2016 at 11:35 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: > Robert, > > Thank you for your response. > I would like to try kafka-console-consumer but I have no idea about > how to measure the consuming throughput. > Are there any standard way? > I would also try Kafka broker on physical servers. > > Regarding version, I have upgraded to Flink 1.0.0 and replaced > FlinkKafkaConsumer 082 with 09, but did not see > any difference in performance. > > Regards, > Hironori > > > > 2016-03-11 23:25 GMT+09:00 Robert Metzger <rmetz...@apache.org>: > > Hi Hironori, > > > > can you try with the kafka-console-consumer how many messages you can > read > > in one minute? > > Maybe the broker's disk I/O is limited because everything is running in > > virtual machines (potentially sharing one hard disk?) > > I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is > > working as expected. > > > > Our Kafka 0.8 consumer has been tested in environments where its reading > > with more than 100 MB/s per from a broker. > > > > > > On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: > >> > >> Aljoscha, > >> > >> Thank you for your response. > >> > >> I tried no JSON parsing and no sink (DiscardingSink) case. The > >> throughput was 8228msg/sec. > >> Slightly better than JSON + Elasticsearch case. > >> I also tried using socketTextStream instead of FlinkKafkaConsumer, in > >> that case, the result was > >> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket > >> server was the bottleneck) > >> That was amazing, although Flink's fault tolerance feature is not > >> available with socketTextStream. > >> > >> Regards, > >> Hironori > >> > >> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: > >> > Hi, > >> > Another interesting test would be a combination of 3) and 2). I.e. no > >> > JSON parsing and no sink. This would show what the raw throughput can > be > >> > before being slowed down by writing to Elasticsearch. > >> > > >> > Also .print() is also not feasible for production since it just prints > >> > every element to the stdout log on the TaskManagers, which itself can > cause > >> > quite a slowdown. You could try: > >> > > >> > datastream.addSink(new DiscardingSink()) > >> > > >> > which is a dummy sink that does nothing. > >> > > >> > Cheers, > >> > Aljoscha > >> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote: > >> >> > >> >> Stephan, > >> >> > >> >> Sorry for the delay in my response. > >> >> I tried 3 cases you suggested. > >> >> > >> >> This time, I set parallelism to 1 for simpicity. > >> >> > >> >> 0) base performance (same as the first e-mail): 1,480msg/sec > >> >> 1) Disable checkpointing : almost same as 0) > >> >> 2) No ES sink. just print() : 1,510msg/sec > >> >> 3) JSON to TSV : 8,000msg/sec > >> >> > >> >> So, as you can see, the bottleneck was JSON parsing. I also want to > >> >> try eliminating Kafka to see > >> >> if there is a room to improve performance.(Currently, I am using > >> >> FlinkKafkaConsumer082 with Kafka 0.9 > >> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09). > >> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of > >> >> Flink's scalability and fault tolerance. > >> >> Thank you for your advice. > >> >> > >> >> Regards, > >> >> Hironori Ogibayashi > >> >> > >> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>: > >> >>> Stephan, > >> >>> > >> >>> Thank you for your quick response. > >> >>> I will try and post the result later. > >> >>> > >> >>> Regards, > >> >>> Hironori > >> >>> > >> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: > >> >>>> Hi! > >> >>>> > >> >>>> I would try and dig bit by bit into what the bottleneck is: > >> >>>> > >> >>>> 1) Disable the checkpointing, see what difference that makes > >> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see > >> >>>> if that > >> >>>> is limiting > >> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU > intensive > >> >>>> and > >> >>>> easily dominate the entire pipeline. > >> >>>> > >> >>>> Greetings, > >> >>>> Stephan > >> >>>> > >> >>>> > >> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> > >> >>>> wrote: > >> >>>>> > >> >>>>> Hello, > >> >>>>> > >> >>>>> I started evaluating Flink and tried simple performance test. > >> >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I > >> >>>>> think this is quite low and wondering if it is a reasonable > result. > >> >>>>> If someone could check it, it would be great. > >> >>>>> > >> >>>>> Here is the detail: > >> >>>>> > >> >>>>> [servers] > >> >>>>> - 3 Kafka broker with 3 partitions > >> >>>>> - 3 Flink TaskManager + 1 JobManager > >> >>>>> - 1 Elasticsearch > >> >>>>> All of them are separate VM with 8vCPU, 8GB memory > >> >>>>> > >> >>>>> [test case] > >> >>>>> The application counts access log by URI with in 1 minute window > and > >> >>>>> send the result to Elasticsearch. The actual code is below. > >> >>>>> I used '-p 3' option to flink run command, so the task was > >> >>>>> distributed > >> >>>>> to 3 TaskManagers. > >> >>>>> In the test, I sent about 5000 logs/sec to Kafka. > >> >>>>> > >> >>>>> [result] > >> >>>>> - From Elasticsearch records, the total access count for all URI > was > >> >>>>> about 260,000/min = 4300/sec. This is the entire throughput. > >> >>>>> - Kafka consumer lag was keep growing. > >> >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From > >> >>>>> top > >> >>>>> command output, Flink java process was using 100%(1 CPU full) > >> >>>>> > >> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks. > >> >>>>> > >> >>>>> Here is the application code. > >> >>>>> --- > >> >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment > >> >>>>> env.enableCheckpointing(1000) > >> >>>>> ... > >> >>>>> val stream = env > >> >>>>> .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", > new > >> >>>>> SimpleStringSchema(), properties)) > >> >>>>> .map{ json => > JSON.parseFull(json).get.asInstanceOf[Map[String, > >> >>>>> AnyRef]] } > >> >>>>> .map{ x => x.get("uri") match { > >> >>>>> case Some(y) => (y.asInstanceOf[String],1) > >> >>>>> case None => ("", 1) > >> >>>>> }} > >> >>>>> .keyBy(0) > >> >>>>> .timeWindow(Time.of(1, TimeUnit.MINUTES)) > >> >>>>> .sum(1) > >> >>>>> .map{ x => (System.currentTimeMillis(), x)} > >> >>>>> .addSink(new ElasticsearchSink(config, transports, new > >> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { > >> >>>>> override def createIndexRequest(element: Tuple2[Long, > >> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { > >> >>>>> val json = new HashMap[String, AnyRef] > >> >>>>> json.put("@timestamp", new Timestamp(element._1)) > >> >>>>> json.put("uri", element._2._1) > >> >>>>> json.put("count", element._2._2: java.lang.Integer) > >> >>>>> println("SENDING: " + element) > >> >>>>> > >> >>>>> > Requests.indexRequest.index("dummy2").`type`("my-type").source(json) > >> >>>>> } > >> >>>>> })) > >> >>>>> --- > >> >>>>> > >> >>>>> Regards, > >> >>>>> Hironori Ogibayashi > >> >>>> > >> >>>> > >> > > > > > > -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org