Stephan,

Sorry for the delay in my response.
I tried 3 cases you suggested.

This time, I set parallelism to 1 for simpicity.

0) base performance (same as the first e-mail): 1,480msg/sec
1) Disable checkpointing : almost same as 0)
2) No ES sink. just print() : 1,510msg/sec
3) JSON to TSV : 8,000msg/sec

So, as you can see, the bottleneck was JSON parsing. I also want to
try eliminating Kafka to see
if there is a room to improve performance.(Currently, I am using
FlinkKafkaConsumer082 with Kafka 0.9
I think I should try Flink 1.0 and FlinkKafkaConsumer09).
Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
Flink's scalability and fault tolerance.
Thank you for your advice.

Regards,
Hironori Ogibayashi

2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>:
> Stephan,
>
> Thank you for your quick response.
> I will try and post the result later.
>
> Regards,
> Hironori
>
> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>> Hi!
>>
>> I would try and dig bit by bit into what the bottleneck is:
>>
>>  1) Disable the checkpointing, see what difference that makes
>>  2) Use a dummy sink (discarding) rather than elastic search, to see if that
>> is limiting
>>  3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
>> easily dominate the entire pipeline.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> I started evaluating Flink and tried simple performance test.
>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>>> think this is quite low and wondering if it is a reasonable result.
>>> If someone could check it, it would be great.
>>>
>>> Here is the detail:
>>>
>>> [servers]
>>> - 3 Kafka broker with 3 partitions
>>> - 3 Flink TaskManager + 1 JobManager
>>> - 1 Elasticsearch
>>> All of them are separate VM with 8vCPU, 8GB memory
>>>
>>> [test case]
>>> The application counts access log by URI with in 1 minute window and
>>> send the result to Elasticsearch. The actual code is below.
>>> I used '-p 3' option to flink run command, so the task was distributed
>>> to 3 TaskManagers.
>>> In the test, I sent about 5000 logs/sec to Kafka.
>>>
>>> [result]
>>> - From Elasticsearch records, the total access count for all URI was
>>> about 260,000/min = 4300/sec. This is the entire throughput.
>>> - Kafka consumer lag was keep growing.
>>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>>> command output, Flink java process was using 100%(1 CPU full)
>>>
>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>>
>>> Here is the application code.
>>> ---
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.enableCheckpointing(1000)
>>> ...
>>>     val stream = env
>>>       .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>>> SimpleStringSchema(), properties))
>>>       .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>>> AnyRef]] }
>>>       .map{ x => x.get("uri") match {
>>>         case Some(y) => (y.asInstanceOf[String],1)
>>>         case None => ("", 1)
>>>       }}
>>>       .keyBy(0)
>>>       .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>>       .sum(1)
>>>       .map{ x => (System.currentTimeMillis(), x)}
>>>       .addSink(new ElasticsearchSink(config, transports, new
>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>>>         override def createIndexRequest(element: Tuple2[Long,
>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>>           val json = new HashMap[String, AnyRef]
>>>           json.put("@timestamp", new Timestamp(element._1))
>>>           json.put("uri", element._2._1)
>>>           json.put("count", element._2._2: java.lang.Integer)
>>>           println("SENDING: " + element)
>>>
>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>>>         }
>>>       }))
>>> ---
>>>
>>> Regards,
>>> Hironori Ogibayashi
>>
>>

Reply via email to