Aljoscha, Thank you for your response.
I tried no JSON parsing and no sink (DiscardingSink) case. The throughput was 8228msg/sec. Slightly better than JSON + Elasticsearch case. I also tried using socketTextStream instead of FlinkKafkaConsumer, in that case, the result was 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket server was the bottleneck) That was amazing, although Flink's fault tolerance feature is not available with socketTextStream. Regards, Hironori 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > Another interesting test would be a combination of 3) and 2). I.e. no JSON > parsing and no sink. This would show what the raw throughput can be before > being slowed down by writing to Elasticsearch. > > Also .print() is also not feasible for production since it just prints every > element to the stdout log on the TaskManagers, which itself can cause quite a > slowdown. You could try: > > datastream.addSink(new DiscardingSink()) > > which is a dummy sink that does nothing. > > Cheers, > Aljoscha >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> Stephan, >> >> Sorry for the delay in my response. >> I tried 3 cases you suggested. >> >> This time, I set parallelism to 1 for simpicity. >> >> 0) base performance (same as the first e-mail): 1,480msg/sec >> 1) Disable checkpointing : almost same as 0) >> 2) No ES sink. just print() : 1,510msg/sec >> 3) JSON to TSV : 8,000msg/sec >> >> So, as you can see, the bottleneck was JSON parsing. I also want to >> try eliminating Kafka to see >> if there is a room to improve performance.(Currently, I am using >> FlinkKafkaConsumer082 with Kafka 0.9 >> I think I should try Flink 1.0 and FlinkKafkaConsumer09). >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of >> Flink's scalability and fault tolerance. >> Thank you for your advice. >> >> Regards, >> Hironori Ogibayashi >> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>: >>> Stephan, >>> >>> Thank you for your quick response. >>> I will try and post the result later. >>> >>> Regards, >>> Hironori >>> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: >>>> Hi! >>>> >>>> I would try and dig bit by bit into what the bottleneck is: >>>> >>>> 1) Disable the checkpointing, see what difference that makes >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see if that >>>> is limiting >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and >>>> easily dominate the entire pipeline. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >>>>> >>>>> Hello, >>>>> >>>>> I started evaluating Flink and tried simple performance test. >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I >>>>> think this is quite low and wondering if it is a reasonable result. >>>>> If someone could check it, it would be great. >>>>> >>>>> Here is the detail: >>>>> >>>>> [servers] >>>>> - 3 Kafka broker with 3 partitions >>>>> - 3 Flink TaskManager + 1 JobManager >>>>> - 1 Elasticsearch >>>>> All of them are separate VM with 8vCPU, 8GB memory >>>>> >>>>> [test case] >>>>> The application counts access log by URI with in 1 minute window and >>>>> send the result to Elasticsearch. The actual code is below. >>>>> I used '-p 3' option to flink run command, so the task was distributed >>>>> to 3 TaskManagers. >>>>> In the test, I sent about 5000 logs/sec to Kafka. >>>>> >>>>> [result] >>>>> - From Elasticsearch records, the total access count for all URI was >>>>> about 260,000/min = 4300/sec. This is the entire throughput. >>>>> - Kafka consumer lag was keep growing. >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From top >>>>> command output, Flink java process was using 100%(1 CPU full) >>>>> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks. >>>>> >>>>> Here is the application code. >>>>> --- >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>> env.enableCheckpointing(1000) >>>>> ... >>>>> val stream = env >>>>> .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new >>>>> SimpleStringSchema(), properties)) >>>>> .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, >>>>> AnyRef]] } >>>>> .map{ x => x.get("uri") match { >>>>> case Some(y) => (y.asInstanceOf[String],1) >>>>> case None => ("", 1) >>>>> }} >>>>> .keyBy(0) >>>>> .timeWindow(Time.of(1, TimeUnit.MINUTES)) >>>>> .sum(1) >>>>> .map{ x => (System.currentTimeMillis(), x)} >>>>> .addSink(new ElasticsearchSink(config, transports, new >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { >>>>> override def createIndexRequest(element: Tuple2[Long, >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { >>>>> val json = new HashMap[String, AnyRef] >>>>> json.put("@timestamp", new Timestamp(element._1)) >>>>> json.put("uri", element._2._1) >>>>> json.put("count", element._2._2: java.lang.Integer) >>>>> println("SENDING: " + element) >>>>> >>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json) >>>>> } >>>>> })) >>>>> --- >>>>> >>>>> Regards, >>>>> Hironori Ogibayashi >>>> >>>> >