Hi Hironori,

[1] and [2] describes the process of measuring Kafka performance. I think
the perf test code is under org.apache.kafka.tools package in 0.9, so you
may have to change commands in [2] to reflect that.

Thanks
Milinda

[1]
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
[2] https://gist.github.com/jkreps/c7ddb4041ef62a900e6c

On Tue, Mar 15, 2016 at 11:35 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:

> Robert,
>
> Thank you for your response.
> I would like to try  kafka-console-consumer but I have no idea about
> how to measure the consuming throughput.
> Are there any standard way?
> I would also try Kafka broker on physical servers.
>
> Regarding version, I have upgraded to Flink 1.0.0 and replaced
> FlinkKafkaConsumer 082 with 09, but did not see
> any difference in performance.
>
> Regards,
> Hironori
>
>
>
> 2016-03-11 23:25 GMT+09:00 Robert Metzger <rmetz...@apache.org>:
> > Hi Hironori,
> >
> > can you try with the kafka-console-consumer how many messages you can
> read
> > in one minute?
> > Maybe the broker's disk I/O is limited because everything is running in
> > virtual machines (potentially sharing one hard disk?)
> > I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is
> > working as expected.
> >
> > Our Kafka 0.8 consumer has been tested in environments where its reading
> > with more than 100 MB/s per from a broker.
> >
> >
> > On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
> >>
> >> Aljoscha,
> >>
> >> Thank you for your response.
> >>
> >> I tried no JSON parsing and no sink (DiscardingSink) case. The
> >> throughput was 8228msg/sec.
> >> Slightly better than JSON + Elasticsearch case.
> >> I also tried using socketTextStream instead of FlinkKafkaConsumer, in
> >> that case, the result was
> >> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
> >> server was the bottleneck)
> >> That was amazing, although Flink's fault tolerance feature is not
> >> available with socketTextStream.
> >>
> >> Regards,
> >> Hironori
> >>
> >> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
> >> > Hi,
> >> > Another interesting test would be a combination of 3) and 2). I.e. no
> >> > JSON parsing and no sink. This would show what the raw throughput can
> be
> >> > before being slowed down by writing to Elasticsearch.
> >> >
> >> > Also .print() is also not feasible for production since it just prints
> >> > every element to the stdout log on the TaskManagers, which itself can
> cause
> >> > quite a slowdown. You could try:
> >> >
> >> > datastream.addSink(new DiscardingSink())
> >> >
> >> > which is a dummy sink that does nothing.
> >> >
> >> > Cheers,
> >> > Aljoscha
> >> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
> >> >>
> >> >> Stephan,
> >> >>
> >> >> Sorry for the delay in my response.
> >> >> I tried 3 cases you suggested.
> >> >>
> >> >> This time, I set parallelism to 1 for simpicity.
> >> >>
> >> >> 0) base performance (same as the first e-mail): 1,480msg/sec
> >> >> 1) Disable checkpointing : almost same as 0)
> >> >> 2) No ES sink. just print() : 1,510msg/sec
> >> >> 3) JSON to TSV : 8,000msg/sec
> >> >>
> >> >> So, as you can see, the bottleneck was JSON parsing. I also want to
> >> >> try eliminating Kafka to see
> >> >> if there is a room to improve performance.(Currently, I am using
> >> >> FlinkKafkaConsumer082 with Kafka 0.9
> >> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
> >> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
> >> >> Flink's scalability and fault tolerance.
> >> >> Thank you for your advice.
> >> >>
> >> >> Regards,
> >> >> Hironori Ogibayashi
> >> >>
> >> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>:
> >> >>> Stephan,
> >> >>>
> >> >>> Thank you for your quick response.
> >> >>> I will try and post the result later.
> >> >>>
> >> >>> Regards,
> >> >>> Hironori
> >> >>>
> >> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
> >> >>>> Hi!
> >> >>>>
> >> >>>> I would try and dig bit by bit into what the bottleneck is:
> >> >>>>
> >> >>>> 1) Disable the checkpointing, see what difference that makes
> >> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see
> >> >>>> if that
> >> >>>> is limiting
> >> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU
> intensive
> >> >>>> and
> >> >>>> easily dominate the entire pipeline.
> >> >>>>
> >> >>>> Greetings,
> >> >>>> Stephan
> >> >>>>
> >> >>>>
> >> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com>
> >> >>>> wrote:
> >> >>>>>
> >> >>>>> Hello,
> >> >>>>>
> >> >>>>> I started evaluating Flink and tried simple performance test.
> >> >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I
> >> >>>>> think this is quite low and wondering if it is a reasonable
> result.
> >> >>>>> If someone could check it, it would be great.
> >> >>>>>
> >> >>>>> Here is the detail:
> >> >>>>>
> >> >>>>> [servers]
> >> >>>>> - 3 Kafka broker with 3 partitions
> >> >>>>> - 3 Flink TaskManager + 1 JobManager
> >> >>>>> - 1 Elasticsearch
> >> >>>>> All of them are separate VM with 8vCPU, 8GB memory
> >> >>>>>
> >> >>>>> [test case]
> >> >>>>> The application counts access log by URI with in 1 minute window
> and
> >> >>>>> send the result to Elasticsearch. The actual code is below.
> >> >>>>> I used '-p 3' option to flink run command, so the task was
> >> >>>>> distributed
> >> >>>>> to 3 TaskManagers.
> >> >>>>> In the test, I sent about 5000 logs/sec to Kafka.
> >> >>>>>
> >> >>>>> [result]
> >> >>>>> - From Elasticsearch records, the total access count for all URI
> was
> >> >>>>> about 260,000/min = 4300/sec. This is the entire throughput.
> >> >>>>> - Kafka consumer lag was keep growing.
> >> >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From
> >> >>>>> top
> >> >>>>> command output, Flink java process was using 100%(1 CPU full)
> >> >>>>>
> >> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
> >> >>>>>
> >> >>>>> Here is the application code.
> >> >>>>> ---
> >> >>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> >> >>>>>    env.enableCheckpointing(1000)
> >> >>>>> ...
> >> >>>>>    val stream = env
> >> >>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy",
> new
> >> >>>>> SimpleStringSchema(), properties))
> >> >>>>>      .map{ json =>
> JSON.parseFull(json).get.asInstanceOf[Map[String,
> >> >>>>> AnyRef]] }
> >> >>>>>      .map{ x => x.get("uri") match {
> >> >>>>>        case Some(y) => (y.asInstanceOf[String],1)
> >> >>>>>        case None => ("", 1)
> >> >>>>>      }}
> >> >>>>>      .keyBy(0)
> >> >>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
> >> >>>>>      .sum(1)
> >> >>>>>      .map{ x => (System.currentTimeMillis(), x)}
> >> >>>>>      .addSink(new ElasticsearchSink(config, transports, new
> >> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
> >> >>>>>        override def createIndexRequest(element: Tuple2[Long,
> >> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
> >> >>>>>          val json = new HashMap[String, AnyRef]
> >> >>>>>          json.put("@timestamp", new Timestamp(element._1))
> >> >>>>>          json.put("uri", element._2._1)
> >> >>>>>          json.put("count", element._2._2: java.lang.Integer)
> >> >>>>>          println("SENDING: " + element)
> >> >>>>>
> >> >>>>>
> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
> >> >>>>>        }
> >> >>>>>      }))
> >> >>>>> ---
> >> >>>>>
> >> >>>>> Regards,
> >> >>>>> Hironori Ogibayashi
> >> >>>>
> >> >>>>
> >> >
> >
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Reply via email to