Stephan, Thank you for your quick response. I will try and post the result later.
Regards, Hironori 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: > Hi! > > I would try and dig bit by bit into what the bottleneck is: > > 1) Disable the checkpointing, see what difference that makes > 2) Use a dummy sink (discarding) rather than elastic search, to see if that > is limiting > 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and > easily dominate the entire pipeline. > > Greetings, > Stephan > > > On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> Hello, >> >> I started evaluating Flink and tried simple performance test. >> The result was just about 4000 messages/sec with 300% CPU usage. I >> think this is quite low and wondering if it is a reasonable result. >> If someone could check it, it would be great. >> >> Here is the detail: >> >> [servers] >> - 3 Kafka broker with 3 partitions >> - 3 Flink TaskManager + 1 JobManager >> - 1 Elasticsearch >> All of them are separate VM with 8vCPU, 8GB memory >> >> [test case] >> The application counts access log by URI with in 1 minute window and >> send the result to Elasticsearch. The actual code is below. >> I used '-p 3' option to flink run command, so the task was distributed >> to 3 TaskManagers. >> In the test, I sent about 5000 logs/sec to Kafka. >> >> [result] >> - From Elasticsearch records, the total access count for all URI was >> about 260,000/min = 4300/sec. This is the entire throughput. >> - Kafka consumer lag was keep growing. >> - The CPU usage of each TaskManager machine was about 13-14%. From top >> command output, Flink java process was using 100%(1 CPU full) >> >> So I thought the bottleneck here was CPU used by Flink Tasks. >> >> Here is the application code. >> --- >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.enableCheckpointing(1000) >> ... >> val stream = env >> .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new >> SimpleStringSchema(), properties)) >> .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, >> AnyRef]] } >> .map{ x => x.get("uri") match { >> case Some(y) => (y.asInstanceOf[String],1) >> case None => ("", 1) >> }} >> .keyBy(0) >> .timeWindow(Time.of(1, TimeUnit.MINUTES)) >> .sum(1) >> .map{ x => (System.currentTimeMillis(), x)} >> .addSink(new ElasticsearchSink(config, transports, new >> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { >> override def createIndexRequest(element: Tuple2[Long, >> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { >> val json = new HashMap[String, AnyRef] >> json.put("@timestamp", new Timestamp(element._1)) >> json.put("uri", element._2._1) >> json.put("count", element._2._2: java.lang.Integer) >> println("SENDING: " + element) >> >> Requests.indexRequest.index("dummy2").`type`("my-type").source(json) >> } >> })) >> --- >> >> Regards, >> Hironori Ogibayashi > >