Stephan,

Thank you for your quick response.
I will try and post the result later.

Regards,
Hironori

2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
> Hi!
>
> I would try and dig bit by bit into what the bottleneck is:
>
>  1) Disable the checkpointing, see what difference that makes
>  2) Use a dummy sink (discarding) rather than elastic search, to see if that
> is limiting
>  3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
> easily dominate the entire pipeline.
>
> Greetings,
> Stephan
>
>
> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>
>> Hello,
>>
>> I started evaluating Flink and tried simple performance test.
>> The result was just about 4000 messages/sec with 300% CPU usage. I
>> think this is quite low and wondering if it is a reasonable result.
>> If someone could check it, it would be great.
>>
>> Here is the detail:
>>
>> [servers]
>> - 3 Kafka broker with 3 partitions
>> - 3 Flink TaskManager + 1 JobManager
>> - 1 Elasticsearch
>> All of them are separate VM with 8vCPU, 8GB memory
>>
>> [test case]
>> The application counts access log by URI with in 1 minute window and
>> send the result to Elasticsearch. The actual code is below.
>> I used '-p 3' option to flink run command, so the task was distributed
>> to 3 TaskManagers.
>> In the test, I sent about 5000 logs/sec to Kafka.
>>
>> [result]
>> - From Elasticsearch records, the total access count for all URI was
>> about 260,000/min = 4300/sec. This is the entire throughput.
>> - Kafka consumer lag was keep growing.
>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>> command output, Flink java process was using 100%(1 CPU full)
>>
>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>
>> Here is the application code.
>> ---
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.enableCheckpointing(1000)
>> ...
>>     val stream = env
>>       .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>> SimpleStringSchema(), properties))
>>       .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>> AnyRef]] }
>>       .map{ x => x.get("uri") match {
>>         case Some(y) => (y.asInstanceOf[String],1)
>>         case None => ("", 1)
>>       }}
>>       .keyBy(0)
>>       .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>       .sum(1)
>>       .map{ x => (System.currentTimeMillis(), x)}
>>       .addSink(new ElasticsearchSink(config, transports, new
>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>>         override def createIndexRequest(element: Tuple2[Long,
>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>           val json = new HashMap[String, AnyRef]
>>           json.put("@timestamp", new Timestamp(element._1))
>>           json.put("uri", element._2._1)
>>           json.put("count", element._2._2: java.lang.Integer)
>>           println("SENDING: " + element)
>>
>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>>         }
>>       }))
>> ---
>>
>> Regards,
>> Hironori Ogibayashi
>
>

Reply via email to