Hello, I started evaluating Flink and tried simple performance test. The result was just about 4000 messages/sec with 300% CPU usage. I think this is quite low and wondering if it is a reasonable result. If someone could check it, it would be great.
Here is the detail: [servers] - 3 Kafka broker with 3 partitions - 3 Flink TaskManager + 1 JobManager - 1 Elasticsearch All of them are separate VM with 8vCPU, 8GB memory [test case] The application counts access log by URI with in 1 minute window and send the result to Elasticsearch. The actual code is below. I used '-p 3' option to flink run command, so the task was distributed to 3 TaskManagers. In the test, I sent about 5000 logs/sec to Kafka. [result] - From Elasticsearch records, the total access count for all URI was about 260,000/min = 4300/sec. This is the entire throughput. - Kafka consumer lag was keep growing. - The CPU usage of each TaskManager machine was about 13-14%. From top command output, Flink java process was using 100%(1 CPU full) So I thought the bottleneck here was CPU used by Flink Tasks. Here is the application code. --- val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(1000) ... val stream = env .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new SimpleStringSchema(), properties)) .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, AnyRef]] } .map{ x => x.get("uri") match { case Some(y) => (y.asInstanceOf[String],1) case None => ("", 1) }} .keyBy(0) .timeWindow(Time.of(1, TimeUnit.MINUTES)) .sum(1) .map{ x => (System.currentTimeMillis(), x)} .addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { override def createIndexRequest(element: Tuple2[Long, Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { val json = new HashMap[String, AnyRef] json.put("@timestamp", new Timestamp(element._1)) json.put("uri", element._2._1) json.put("count", element._2._2: java.lang.Integer) println("SENDING: " + element) Requests.indexRequest.index("dummy2").`type`("my-type").source(json) } })) --- Regards, Hironori Ogibayashi