Stephan, Sorry for the delay in my response. I tried 3 cases you suggested.
This time, I set parallelism to 1 for simpicity. 0) base performance (same as the first e-mail): 1,480msg/sec 1) Disable checkpointing : almost same as 0) 2) No ES sink. just print() : 1,510msg/sec 3) JSON to TSV : 8,000msg/sec So, as you can see, the bottleneck was JSON parsing. I also want to try eliminating Kafka to see if there is a room to improve performance.(Currently, I am using FlinkKafkaConsumer082 with Kafka 0.9 I think I should try Flink 1.0 and FlinkKafkaConsumer09). Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of Flink's scalability and fault tolerance. Thank you for your advice. Regards, Hironori Ogibayashi 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>: > Stephan, > > Thank you for your quick response. > I will try and post the result later. > > Regards, > Hironori > > 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: >> Hi! >> >> I would try and dig bit by bit into what the bottleneck is: >> >> 1) Disable the checkpointing, see what difference that makes >> 2) Use a dummy sink (discarding) rather than elastic search, to see if that >> is limiting >> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and >> easily dominate the entire pipeline. >> >> Greetings, >> Stephan >> >> >> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >>> >>> Hello, >>> >>> I started evaluating Flink and tried simple performance test. >>> The result was just about 4000 messages/sec with 300% CPU usage. I >>> think this is quite low and wondering if it is a reasonable result. >>> If someone could check it, it would be great. >>> >>> Here is the detail: >>> >>> [servers] >>> - 3 Kafka broker with 3 partitions >>> - 3 Flink TaskManager + 1 JobManager >>> - 1 Elasticsearch >>> All of them are separate VM with 8vCPU, 8GB memory >>> >>> [test case] >>> The application counts access log by URI with in 1 minute window and >>> send the result to Elasticsearch. The actual code is below. >>> I used '-p 3' option to flink run command, so the task was distributed >>> to 3 TaskManagers. >>> In the test, I sent about 5000 logs/sec to Kafka. >>> >>> [result] >>> - From Elasticsearch records, the total access count for all URI was >>> about 260,000/min = 4300/sec. This is the entire throughput. >>> - Kafka consumer lag was keep growing. >>> - The CPU usage of each TaskManager machine was about 13-14%. From top >>> command output, Flink java process was using 100%(1 CPU full) >>> >>> So I thought the bottleneck here was CPU used by Flink Tasks. >>> >>> Here is the application code. >>> --- >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.enableCheckpointing(1000) >>> ... >>> val stream = env >>> .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new >>> SimpleStringSchema(), properties)) >>> .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, >>> AnyRef]] } >>> .map{ x => x.get("uri") match { >>> case Some(y) => (y.asInstanceOf[String],1) >>> case None => ("", 1) >>> }} >>> .keyBy(0) >>> .timeWindow(Time.of(1, TimeUnit.MINUTES)) >>> .sum(1) >>> .map{ x => (System.currentTimeMillis(), x)} >>> .addSink(new ElasticsearchSink(config, transports, new >>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { >>> override def createIndexRequest(element: Tuple2[Long, >>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { >>> val json = new HashMap[String, AnyRef] >>> json.put("@timestamp", new Timestamp(element._1)) >>> json.put("uri", element._2._1) >>> json.put("count", element._2._2: java.lang.Integer) >>> println("SENDING: " + element) >>> >>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json) >>> } >>> })) >>> --- >>> >>> Regards, >>> Hironori Ogibayashi >> >>