Hi,

 

I wrote a small Flink program on a yarn cluster (128GB RAM, 8 core Xeon
CPU for each node) essentially reading messages from kafka and applying a
simple CEP rule on those messages. The program is expected to have a
parallelism of 1 for input as my test kafka topic has only 1 partition. 

 

The program looks pretty much like this:

 

FlinkKafkaConsumer<Databean> flinkKafkaConsumer = new
FlinkKafkaConsumer<>(sourceTopicName, deserializer, properties);
flinkKafkaConsumer.setStartFromGroupOffsets();
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
flinkKafkaConsumer.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Databean>(Time.minutes(30)) {
    @Override
    public long extractTimestamp(Databean element) {
        return element.getTs();
    }
});

SingleOutputStreamOperator<Databean> kafkaSource =
env.addSource(flinkKafkaConsumer)

 

// Keying here is very slow???

KeyedStream<Databean, String> keyedStream = proxylogsStream.keyBy(bean ->
bean.getUser());
 
Pattern<Databean, Databean> cepPattern = Pattern.<Databean>
        begin("firstStep",
AfterMatchSkipStrategy.skipPastLastEvent()).where(new FirstFilter())
        .followedBy("secondStep").where(new SecondFilter())
        .within(Time.minutes(15));

PatternStream<Databean> patternMatchStream = CEP.pattern(keyedStream,
cepPattern);

SingleOutputStreamOperator<Alarmbean> beanAlerts =
patternMatchStream.select(new MatchToAlarmConverter());

beanAlerts.addSink(new FlinkKafkaProducer<>(config.kafkaAlertsTopic, new
AlarmBeanSerializeSchema(), properties));

 

The applied CEP filter "FirstFilter" and "SecondFilter" are very simple
rules like 

return "humidity".equals(bean.getRecordedMetricType())

 

My Databean has round about 50 elements containing numbers and small
strings (Up to 50 characters). 

Currently, I write 200.000 Elements into my Kafka topic every 5 minutes.
100.000 of those have the same username, i.e. all have the name "empty",
and the other half are almost unique. (some random number between 1 and
100000000). The generated data timestamp randomly varies +-7.5 minutes
between the generated timestamp (Generation time = time pushed into
kafka). 

My CEP rule is written with conditions that never match, so the kafka sink
as well as the stream select function can be eliminated as causes for the
slow processing speeds. 

 

I start the application via yarn with: 

"${FLINK_HOME}/bin/yarn-session.sh" -n 4 -jm 4096m -tm 65536m --name
"TESTJOB" -d
${FLINK_HOME}/bin/flink run -m ${FLINK_HOST} -d -n "${JOB_JAR}" $*
 

So the job has plenty of RAM available, but I didn't note any difference
in terms of speed when assigning 16G or 64G of RAM.  As expected, I have a
single task manager and parallelism 1. 

 

Now about my problem:

Currently, the pipeline processes round about 150-300 elements per second.
On startup, it peaks to 3000-4000 elements per second but slows down
within one minute to 150-300 elements per second.

I immediately expected CEP to be that slow (As this is my first CEP
experiment), but I observed the following:

1.      Even though CEP has quite some overhead (elements must be sorted
on time), my rule is very simple and should, in my perspective, perform
much better on that machine. My shot in the dark before was something like
10.000 - 100.000 elements/s. 
2.      None of my machine resources is fully utilized, i.e. none of the
cluster CPU runs at 100% utilization (according to htop). And the memory
is virtually available, but the RES column in htop states the processes
uses 5499MB. 
3.      According to Flink GUI, the job is split into two tasks: First
there is the source task, then there is a hash arrow to the second task
(the keyBy?!) and the second is cep-pattern apply,convert and write to
sink.  From the UI, I know that in task 1 there is a HIGH backpressure
whereas task 2 is OK in terms of backpressure measurement. Even more
interesting: The metrics let me know that in the first task
"0.buffers.outputQueueLength" is constantly on value 9 and
"0.buffers.outPoolUsage" is constant on value 1. This goes along with the
back pressure concept (The task is stuck writing to the buffer for the
next stage as the next stage is consuming too slowly), however, the second
task's metric tell me that "0.buffers.inputQueueLength" is constant on
value 0 and "0.buffers.inPoolUsage" is constant on value 0 as well.
"0.numRecodsInPerSecond" is about 150-300 elements/s. 

 

This lead me to suspect that I don't have a CEP problem but really have a
problem with "keyBy" on localhost as the second task seem to immediately
consume any messages in input queue it receives. My questions:

1.      Is my observation correct that I indeed don't have a CEP problem
but the keyBy causes the issue here?
2.      Why is the queue limited to size 9? Seems really small compared to
the memory available. I would have expected something like 10000 at least.

3.      What happens internally here? In distributed mode, I understand
that I have a distributed queue where sender queue size and receiver queue
size can be different. But on the same machine, in the same JVM, I would
have expected something like a BlockingQueue where both task metrics
report the same queue size (i.e. task1.outputqueue == task2.inputqueue). I
expect something happening there in between "task1 writes" and "task2
receives" which slows down the entire pipeline, but I have no idea on what
that could be. 
4.       Can I do something in order to boost performance by magnitudes
here?

 

Best regards

Theo Diefenthal

Reply via email to