Hi Hironori, can you try with the kafka-console-consumer how many messages you can read in one minute? Maybe the broker's disk I/O is limited because everything is running in virtual machines (potentially sharing one hard disk?) I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is working as expected.
Our Kafka 0.8 consumer has been tested in environments where its reading with more than 100 MB/s per from a broker. On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: > Aljoscha, > > Thank you for your response. > > I tried no JSON parsing and no sink (DiscardingSink) case. The > throughput was 8228msg/sec. > Slightly better than JSON + Elasticsearch case. > I also tried using socketTextStream instead of FlinkKafkaConsumer, in > that case, the result was > 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket > server was the bottleneck) > That was amazing, although Flink's fault tolerance feature is not > available with socketTextStream. > > Regards, > Hironori > > 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: > > Hi, > > Another interesting test would be a combination of 3) and 2). I.e. no > JSON parsing and no sink. This would show what the raw throughput can be > before being slowed down by writing to Elasticsearch. > > > > Also .print() is also not feasible for production since it just prints > every element to the stdout log on the TaskManagers, which itself can cause > quite a slowdown. You could try: > > > > datastream.addSink(new DiscardingSink()) > > > > which is a dummy sink that does nothing. > > > > Cheers, > > Aljoscha > >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote: > >> > >> Stephan, > >> > >> Sorry for the delay in my response. > >> I tried 3 cases you suggested. > >> > >> This time, I set parallelism to 1 for simpicity. > >> > >> 0) base performance (same as the first e-mail): 1,480msg/sec > >> 1) Disable checkpointing : almost same as 0) > >> 2) No ES sink. just print() : 1,510msg/sec > >> 3) JSON to TSV : 8,000msg/sec > >> > >> So, as you can see, the bottleneck was JSON parsing. I also want to > >> try eliminating Kafka to see > >> if there is a room to improve performance.(Currently, I am using > >> FlinkKafkaConsumer082 with Kafka 0.9 > >> I think I should try Flink 1.0 and FlinkKafkaConsumer09). > >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of > >> Flink's scalability and fault tolerance. > >> Thank you for your advice. > >> > >> Regards, > >> Hironori Ogibayashi > >> > >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>: > >>> Stephan, > >>> > >>> Thank you for your quick response. > >>> I will try and post the result later. > >>> > >>> Regards, > >>> Hironori > >>> > >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: > >>>> Hi! > >>>> > >>>> I would try and dig bit by bit into what the bottleneck is: > >>>> > >>>> 1) Disable the checkpointing, see what difference that makes > >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see > if that > >>>> is limiting > >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive > and > >>>> easily dominate the entire pipeline. > >>>> > >>>> Greetings, > >>>> Stephan > >>>> > >>>> > >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> > wrote: > >>>>> > >>>>> Hello, > >>>>> > >>>>> I started evaluating Flink and tried simple performance test. > >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I > >>>>> think this is quite low and wondering if it is a reasonable result. > >>>>> If someone could check it, it would be great. > >>>>> > >>>>> Here is the detail: > >>>>> > >>>>> [servers] > >>>>> - 3 Kafka broker with 3 partitions > >>>>> - 3 Flink TaskManager + 1 JobManager > >>>>> - 1 Elasticsearch > >>>>> All of them are separate VM with 8vCPU, 8GB memory > >>>>> > >>>>> [test case] > >>>>> The application counts access log by URI with in 1 minute window and > >>>>> send the result to Elasticsearch. The actual code is below. > >>>>> I used '-p 3' option to flink run command, so the task was > distributed > >>>>> to 3 TaskManagers. > >>>>> In the test, I sent about 5000 logs/sec to Kafka. > >>>>> > >>>>> [result] > >>>>> - From Elasticsearch records, the total access count for all URI was > >>>>> about 260,000/min = 4300/sec. This is the entire throughput. > >>>>> - Kafka consumer lag was keep growing. > >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From > top > >>>>> command output, Flink java process was using 100%(1 CPU full) > >>>>> > >>>>> So I thought the bottleneck here was CPU used by Flink Tasks. > >>>>> > >>>>> Here is the application code. > >>>>> --- > >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment > >>>>> env.enableCheckpointing(1000) > >>>>> ... > >>>>> val stream = env > >>>>> .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new > >>>>> SimpleStringSchema(), properties)) > >>>>> .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, > >>>>> AnyRef]] } > >>>>> .map{ x => x.get("uri") match { > >>>>> case Some(y) => (y.asInstanceOf[String],1) > >>>>> case None => ("", 1) > >>>>> }} > >>>>> .keyBy(0) > >>>>> .timeWindow(Time.of(1, TimeUnit.MINUTES)) > >>>>> .sum(1) > >>>>> .map{ x => (System.currentTimeMillis(), x)} > >>>>> .addSink(new ElasticsearchSink(config, transports, new > >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { > >>>>> override def createIndexRequest(element: Tuple2[Long, > >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { > >>>>> val json = new HashMap[String, AnyRef] > >>>>> json.put("@timestamp", new Timestamp(element._1)) > >>>>> json.put("uri", element._2._1) > >>>>> json.put("count", element._2._2: java.lang.Integer) > >>>>> println("SENDING: " + element) > >>>>> > >>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json) > >>>>> } > >>>>> })) > >>>>> --- > >>>>> > >>>>> Regards, > >>>>> Hironori Ogibayashi > >>>> > >>>> > > >