Milinda, Thanks. I will try.
Regards, Hironori 2016/03/16 1:31 "Milinda Pathirage" <mpath...@umail.iu.edu>: > Hi Hironori, > > [1] and [2] describes the process of measuring Kafka performance. I think > the perf test code is under org.apache.kafka.tools package in 0.9, so you > may have to change commands in [2] to reflect that. > > Thanks > Milinda > > [1] > https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines > [2] https://gist.github.com/jkreps/c7ddb4041ef62a900e6c > > On Tue, Mar 15, 2016 at 11:35 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: > >> Robert, >> >> Thank you for your response. >> I would like to try kafka-console-consumer but I have no idea about >> how to measure the consuming throughput. >> Are there any standard way? >> I would also try Kafka broker on physical servers. >> >> Regarding version, I have upgraded to Flink 1.0.0 and replaced >> FlinkKafkaConsumer 082 with 09, but did not see >> any difference in performance. >> >> Regards, >> Hironori >> >> >> >> 2016-03-11 23:25 GMT+09:00 Robert Metzger <rmetz...@apache.org>: >> > Hi Hironori, >> > >> > can you try with the kafka-console-consumer how many messages you can >> read >> > in one minute? >> > Maybe the broker's disk I/O is limited because everything is running in >> > virtual machines (potentially sharing one hard disk?) >> > I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker >> is >> > working as expected. >> > >> > Our Kafka 0.8 consumer has been tested in environments where its reading >> > with more than 100 MB/s per from a broker. >> > >> > >> > On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com> >> wrote: >> >> >> >> Aljoscha, >> >> >> >> Thank you for your response. >> >> >> >> I tried no JSON parsing and no sink (DiscardingSink) case. The >> >> throughput was 8228msg/sec. >> >> Slightly better than JSON + Elasticsearch case. >> >> I also tried using socketTextStream instead of FlinkKafkaConsumer, in >> >> that case, the result was >> >> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket >> >> server was the bottleneck) >> >> That was amazing, although Flink's fault tolerance feature is not >> >> available with socketTextStream. >> >> >> >> Regards, >> >> Hironori >> >> >> >> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >> >> > Hi, >> >> > Another interesting test would be a combination of 3) and 2). I.e. no >> >> > JSON parsing and no sink. This would show what the raw throughput >> can be >> >> > before being slowed down by writing to Elasticsearch. >> >> > >> >> > Also .print() is also not feasible for production since it just >> prints >> >> > every element to the stdout log on the TaskManagers, which itself >> can cause >> >> > quite a slowdown. You could try: >> >> > >> >> > datastream.addSink(new DiscardingSink()) >> >> > >> >> > which is a dummy sink that does nothing. >> >> > >> >> > Cheers, >> >> > Aljoscha >> >> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> >> >> >> >> Stephan, >> >> >> >> >> >> Sorry for the delay in my response. >> >> >> I tried 3 cases you suggested. >> >> >> >> >> >> This time, I set parallelism to 1 for simpicity. >> >> >> >> >> >> 0) base performance (same as the first e-mail): 1,480msg/sec >> >> >> 1) Disable checkpointing : almost same as 0) >> >> >> 2) No ES sink. just print() : 1,510msg/sec >> >> >> 3) JSON to TSV : 8,000msg/sec >> >> >> >> >> >> So, as you can see, the bottleneck was JSON parsing. I also want to >> >> >> try eliminating Kafka to see >> >> >> if there is a room to improve performance.(Currently, I am using >> >> >> FlinkKafkaConsumer082 with Kafka 0.9 >> >> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09). >> >> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of >> >> >> Flink's scalability and fault tolerance. >> >> >> Thank you for your advice. >> >> >> >> >> >> Regards, >> >> >> Hironori Ogibayashi >> >> >> >> >> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>: >> >> >>> Stephan, >> >> >>> >> >> >>> Thank you for your quick response. >> >> >>> I will try and post the result later. >> >> >>> >> >> >>> Regards, >> >> >>> Hironori >> >> >>> >> >> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: >> >> >>>> Hi! >> >> >>>> >> >> >>>> I would try and dig bit by bit into what the bottleneck is: >> >> >>>> >> >> >>>> 1) Disable the checkpointing, see what difference that makes >> >> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to >> see >> >> >>>> if that >> >> >>>> is limiting >> >> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU >> intensive >> >> >>>> and >> >> >>>> easily dominate the entire pipeline. >> >> >>>> >> >> >>>> Greetings, >> >> >>>> Stephan >> >> >>>> >> >> >>>> >> >> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com >> > >> >> >>>> wrote: >> >> >>>>> >> >> >>>>> Hello, >> >> >>>>> >> >> >>>>> I started evaluating Flink and tried simple performance test. >> >> >>>>> The result was just about 4000 messages/sec with 300% CPU usage. >> I >> >> >>>>> think this is quite low and wondering if it is a reasonable >> result. >> >> >>>>> If someone could check it, it would be great. >> >> >>>>> >> >> >>>>> Here is the detail: >> >> >>>>> >> >> >>>>> [servers] >> >> >>>>> - 3 Kafka broker with 3 partitions >> >> >>>>> - 3 Flink TaskManager + 1 JobManager >> >> >>>>> - 1 Elasticsearch >> >> >>>>> All of them are separate VM with 8vCPU, 8GB memory >> >> >>>>> >> >> >>>>> [test case] >> >> >>>>> The application counts access log by URI with in 1 minute window >> and >> >> >>>>> send the result to Elasticsearch. The actual code is below. >> >> >>>>> I used '-p 3' option to flink run command, so the task was >> >> >>>>> distributed >> >> >>>>> to 3 TaskManagers. >> >> >>>>> In the test, I sent about 5000 logs/sec to Kafka. >> >> >>>>> >> >> >>>>> [result] >> >> >>>>> - From Elasticsearch records, the total access count for all URI >> was >> >> >>>>> about 260,000/min = 4300/sec. This is the entire throughput. >> >> >>>>> - Kafka consumer lag was keep growing. >> >> >>>>> - The CPU usage of each TaskManager machine was about 13-14%. >> From >> >> >>>>> top >> >> >>>>> command output, Flink java process was using 100%(1 CPU full) >> >> >>>>> >> >> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks. >> >> >>>>> >> >> >>>>> Here is the application code. >> >> >>>>> --- >> >> >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> >>>>> env.enableCheckpointing(1000) >> >> >>>>> ... >> >> >>>>> val stream = env >> >> >>>>> .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", >> new >> >> >>>>> SimpleStringSchema(), properties)) >> >> >>>>> .map{ json => >> JSON.parseFull(json).get.asInstanceOf[Map[String, >> >> >>>>> AnyRef]] } >> >> >>>>> .map{ x => x.get("uri") match { >> >> >>>>> case Some(y) => (y.asInstanceOf[String],1) >> >> >>>>> case None => ("", 1) >> >> >>>>> }} >> >> >>>>> .keyBy(0) >> >> >>>>> .timeWindow(Time.of(1, TimeUnit.MINUTES)) >> >> >>>>> .sum(1) >> >> >>>>> .map{ x => (System.currentTimeMillis(), x)} >> >> >>>>> .addSink(new ElasticsearchSink(config, transports, new >> >> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { >> >> >>>>> override def createIndexRequest(element: Tuple2[Long, >> >> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { >> >> >>>>> val json = new HashMap[String, AnyRef] >> >> >>>>> json.put("@timestamp", new Timestamp(element._1)) >> >> >>>>> json.put("uri", element._2._1) >> >> >>>>> json.put("count", element._2._2: java.lang.Integer) >> >> >>>>> println("SENDING: " + element) >> >> >>>>> >> >> >>>>> >> Requests.indexRequest.index("dummy2").`type`("my-type").source(json) >> >> >>>>> } >> >> >>>>> })) >> >> >>>>> --- >> >> >>>>> >> >> >>>>> Regards, >> >> >>>>> Hironori Ogibayashi >> >> >>>> >> >> >>>> >> >> > >> > >> > >> > > > > -- > Milinda Pathirage > > PhD Student | Research Assistant > School of Informatics and Computing | Data to Insight Center > Indiana University > > twitter: milindalakmal > skype: milinda.pathirage > blog: http://milinda.pathirage.org >