Hi,
(flink version 1.14.2, kafka version 2.6.1)
I have a flink job consuming kafka and simply sinking the data into s3.
The kafka consumer is sometimes delayed on a few partitions. The partitions
are evenly registered by flink subtasks.
I found there was a correlation between kafka consumer fet
business logic. Do you think these operators would have
> a huge impact on the performance?. Or is it something to do with my Kafka
> cluster configuration or the older version of flink (1.8) that I am using
> in my application. Not sure if flink version 1.8 has a performance issue.
>
Kafka cluster configuration
or the older version of flink (1.8) that I am using in my application. Not sure
if flink version 1.8 has a performance issue.
Please let me know.
Below is my kafka cluster configuration.
auto.create.topics.enable=true
log.retention.hours=24
default.replication.fac
Hi Kamaal,
I would first suggest understanding the performance bottleneck, before
applying any optimizations.
Idea 1: Are your CPUs fully utilized?
if yes, good, then scaling up will probably help
If not, then there's another inefficiency
Idea 2: How fast can you get the data into your job, with
Hi Arvid,
The throughput has decreased further after I removed all the rebalance(). The
performance has decreased from 14 minutes for 20K messages to 20 minutes for
20K messages.
Below are the tasks that the flink application is performing. I am using keyBy
and Window operation. Do you think a
Hi Mohammed,
something is definitely wrong in your setup. You can safely say that you
can process 1k records per second and core with Kafka and light processing,
so you shouldn't even need to go distributed in your case.
Do you perform any heavy computation? What is your flatMap doing? Are you
em
Hi Fabian,
Just an update,
Problem 2:-
Caused by: org.apache.kafka.common.errors.NetworkException
It is resolved. It was because we exceeded the number of allowed
partitions for the kafka cluster (AWS MSK cluster). Have deleted
unused topics and partitions to resolve the issue.
Hi Mohammed,
200records should definitely be doable. The first you can do is remove the
print out Sink because they are increasing the load on your cluster due to the
additional IO
operation and secondly preventing Flink from fusing operators.
I am interested to see the updated job graph after
Hi Mohammed,
Without diving too much into your business logic a thing which catches my eye
is the partitiong you are using. In general all
calls to`keyBy`or `rebalance` are very expensive because all the data is
shuffled across down- stream tasks. Flink tries to
fuse operators with the same keyG
Hi,
Apologize for the big message, to explain the issue in detail.
We have a Flink (version 1.8) application running on AWS Kinesis Analytics. The
application has a source which is a kafka topic with 15 partitions (AWS Managed
Streaming Kafka) and the sink is again a kafka topic with 15 partiti
>
> Regards,
> Juha
> --
> *From:* Yun Tang
> *Sent:* Tuesday, September 15, 2020 8:06 AM
> *To:* Juha Mynttinen ; Stephan Ewen <
> se...@apache.org>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Performance issue associated with managed RocksDB memory
Hey
I created this one https://issues.apache.org/jira/browse/FLINK-19238.
Regards,
Juha
From: Yun Tang
Sent: Tuesday, September 15, 2020 8:06 AM
To: Juha Mynttinen ; Stephan Ewen
Cc: user@flink.apache.org
Subject: Re: Performance issue associated with managed
@flink.apache.org
Subject: Re: Performance issue associated with managed RocksDB memory
Hey
I've fixed the code
(https://github.com/juha-mynttinen-king/flink/commits/arena_block_sanity_check)
slightly. Now it WARNs if there is the memory configuration issue. Also, I
think there was a b
Subject: Re: Performance issue associated with managed RocksDB memory
Hey Juha!
I agree that we cannot reasonably expect from the majority of users to
understand block sizes, area sizes, etc to get their application running.
So the default should be "inform when there is a problem and s
it's hard/impossible to know what kind of performance one can expect
> from a Flink application. Thus, it's hard to know if one is suffering from
> e.g. from this performance issue, or if the system is performing normally
> (and inherently being slow).
> 2) even if one susp
d/impossible to know what kind of performance one can expect from a
Flink application. Thus, it's hard to know if one is suffering from e.g. from
this performance issue, or if the system is performing normally (and inherently
being slow).
2) even if one suspects a performance issue, it'
Tuesday, September 8, 2020 20:56
> *To:* Yun Tang ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hey Yun,
>
> Thanks for the detailed answer. It clarified how things work. Especially
> what is the rol
From: Juha Mynttinen
Sent: Tuesday, September 8, 2020 20:56
To: Yun Tang ; user@flink.apache.org
Subject: Re: Performance issue associated with managed RocksDB memory
Hey Yun,
Thanks for the detailed answer. It clarified how things work. Especially what
is the role of RocksDB arena, and arena
ecrease parallelism (if possible), 3) increase managed memory"
Regards,
Juha
From: Yun Tang
Sent: Friday, August 28, 2020 6:58 AM
To: Juha Mynttinen ; user@flink.apache.org
Subject: Re: Performance issue associated with managed RocksDB memory
Hi Juha
Thank
15:56
To: user@flink.apache.org
Subject: Re: Performance issue associated with managed RocksDB memory
The issue can be reproduced by using a certain combinations of the value of
RocksDBOptions.WRITE_BUFFER_RATIO (default 0.5) and the Flink job
parallelism.
Examples that break:
* Parallelism 1 and WRIT
The issue can be reproduced by using a certain combinations of the value of
RocksDBOptions.WRITE_BUFFER_RATIO (default 0.5) and the Flink job
parallelism.
Examples that break:
* Parallelism 1 and WRITE_BUFFER_RATIO 0.1
* Parallelism 5 and the default WRITE_BUFFER_RATIO 0.5
Examples that work:
* P
To clarify, that my questions were all against the very original issue
instead of the WordCount job. The timers come from the window operator you
mentioned as the source of the original issue:
===
bq. If I create a Flink job that has a single "heavy" operator
Hi Juha,
> I can also submit the more complex test with the bigger operator and and a
> window operator. There's just gonna be more code to read. Can I attach a
> file here or how should I submit a larger chuck of code?
You can just attach the file with the code.
> 2. I'm not sure what would / s
Andrey,
A small clarification. The tweaked WordCount I posted earlier doesn't
illustrate the issue I originally explained, i.e. the one where there's a
bigger operator and a smallest possible windows operator. Instead, the
modified WordCount illustrates the degraded performance of a very simple
Fl
PM
To: Andrey Zagrebin
Cc: Juha Mynttinen ; Yun Tang ; user
Subject: Re: Performance issue associated with managed RocksDB memory
Thanks for the ping Andrey.
Hi Juha,
Thanks for reporting the issue. I'd like to check the below things before
further digging into it:
1. Could you let us
Thanks for the ping Andrey.
Hi Juha,
Thanks for reporting the issue. I'd like to check the below things before
further digging into it:
1. Could you let us know your configurations (especially memory related
ones) when running the tests?
2. Did you watch the memory consumption before / after tu
Hi Juha,
Thanks for sharing the testing program to expose the problem.
This indeed looks suboptimal if X does not leave space for the window
operator.
I am adding Yu and Yun who might have a better idea about what could be
improved about sharing the RocksDB memory among operators.
Best,
Andrey
O
Hey,
Here's a simple test. It's basically the WordCount example from Flink, but
using RocksDB as the state backend and having a stateful operator. The
javadocs explain how to use it.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See
Hello there,
In Flink 1.10 the configuration parameter state.backend.rocksdb.memory.managed
defaults to true. This is great, since it makes it simpler to stay within the
memory budget e.g. when running in a container environment. However, I've
noticed performance issues when the switch is enabl
td-p/122046
>[2]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/quot-Slow-ReadProcessor-quot-warnings-when-using-BucketSink-td9427.html
>
>
> --原始邮件 --
> *发件人:*Mu Kong
> *发送时间:*Fri May 22 11:16:32 2020
> *收件人:*user
-原始邮件 --
发件人:Mu Kong
发送时间:Fri May 22 11:16:32 2020
收件人:user
主题:Performance issue when writing to HDFS
Hi all,
I have Flink application consuming from Kafka and writing the data to HDFS
bucketed by event time with BucketingSink.
Sometimes, the the traffic gets high and from the prome
Steve,
thanks a lot for looking into this closer!
Let's discuss the resolution of the issue in the ticket Dawid has created:
https://issues.apache.org/jira/browse/FLINK-15941
Best,
Robert
On Thu, Feb 6, 2020 at 6:59 PM Steve Whelan wrote:
> Robert,
>
> You are correct that it is using a *Cache
Robert,
You are correct that it is using a *CachedSchemaRegistryClient* object.
Therefore, *schemaRegistryClient.*register() should be checking the cache
first before sending a request to the Registry. However, turning on debug
logging of my Registry, I can see a request being sent for every seria
Hi Steve,
I think your observation is correct. If I am not mistaken we should use
*schemaRegistryClient.getId(subject, schema); *instead of**
**schemaRegistryClient.register(subject, schema);. **The former should
perform an http request only if the schema is not in the cache.
I created an issue t
Hi,
thanks a lot for your message. It's certainly not intentional to do a HTTP
request for every single message :)
Isn't the *schemaRegistryClient *an instance of CachedSchemaRegistryClient,
which, as the name says, caches?
Can you check with a debugger at runtime what registry client is used, and
Hi,
I'm running Flink v1.9. I backported the commit adding serialization
support for Confluent's schema registry[1]. Using the code as is, I saw a
nearly 50% drop in peak throughput for my job compared to using
*AvroRowSerializationSchema*.
Looking at the code, *RegistryAvroSerializationSchema.se
Hi Peter,
The performance drops probably be due to de/serialization.
When tasks are chained, records are simply forwarded as Java objects via
method calls.
When a task chain in broken into multiple operators, the records (Java
objects) are serialized by the sending task, possibly shipped over the
Hi all
We have a pipeline (runs on YARN, Flink v1.7.1) which consumes a union of
Kafka and
HDFS sources. We remarked that the throughput is 10 times higher if only
one of these sources is consumed. While trying to identify the problem I
implemented a no-op source which was unioned with one of the
Also, I note that none of the operations show any back pressure issues, and the
records out from the kafka connector slow down to a crawl. Are there any known
issues with kafka throughput that could be the issue rather than flink? I have
a java program that monitors the test that reads all the
Also, I note some messages in the log about my java class not being a valid
POJO because it is missing accessors for a field. Would this impact
performance significantly?
Michael
> On Apr 17, 2018, at 12:54 PM, TechnoMage wrote:
>
> No checkpoints are active.
> I will try that back end.
> Ye
No checkpoints are active.
I will try that back end.
Yes, using JSONObject subclass for most of the intermediate state, with JSON
strings in and out of Kafka. I will look at the config page for how to enable
that.
Thank you,
Michael
> On Apr 17, 2018, at 12:51 PM, Stephan Ewen wrote:
>
> A f
A few ideas how to start debugging this:
- Try deactivating checkpoints. Without that, no work goes into
persisting rocksdb data to the checkpoint store.
- Try to swap RocksDB for the FsStateBackend - that reduces serialization
cost for moving data between heap and offheap (rocksdb).
- Do yo
Memory use is steady throughout the job, but the CPU utilization drops off a
cliff. I assume this is because it becomes I/O bound shuffling managed state.
Are there any metrics on managed state that can help in evaluating what to do
next?
Michael
> On Apr 17, 2018, at 7:11 AM, Michael Latta
Thanks for the suggestion. The task manager is configured for 8GB of heap, and
gets to about 8.3 total. Other java processes (job manager and Kafka). Add a
few more. I will check it again but the instances have 16GB same as my laptop
that completes the test in <90 min.
Michael
Sent from my iP
Have you checked memory usage? It could be as simple as either having
memory leaks, or aggregating more than you think (sometimes not obvious how
much is kept around in memory for longer than one first thinks). If
possible, connect FlightRecorder or similar tool and keep an eye on memory.
Additiona
I am doing a short Proof of Concept for using Flink and Kafka in our product.
On my laptop I can process 10M inputs in about 90 min. On 2 different EC2
instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see
the process hit a wall around 50min into the test and short of 7
Makes sense. The generation process seems to be inherently faster than the
consumption process (Flink program).
Without backpressure, these two will run out of sync, and Kafka does not do
any backpressure (by design).
On Thu, Sep 24, 2015 at 4:51 PM, Rico Bergmann wrote:
> The test data is gene
The test data is generated in a flink program running in a separate jvm. The
generated data is then written to a Kafka topic from which my programs reads
the data ...
> Am 24.09.2015 um 14:53 schrieb Aljoscha Krettek :
>
> Hi Rico,
> are you generating the data directly in your flink program
Hi Rico!
When you say that the program falls behind the unlimited generating source,
I assume you have some unbounded buffering channel (like Kafka) between the
generator and the Flink job. Is that correct? Flink itself backpressures to
the sources, but if the source is Kafka, this does of course
Hi Rico,
are you generating the data directly in your flink program or some external
queue, such as Kafka?
Cheers,
Aljoscha
On Thu, 24 Sep 2015 at 13:47 Rico Bergmann wrote:
> And as side note:
>
> The problem with duplicates seems also to be solved!
>
> Cheers Rico.
>
>
>
> Am 24.09.2015 um 12
And as side note:
The problem with duplicates seems also to be solved!
Cheers Rico.
> Am 24.09.2015 um 12:21 schrieb Rico Bergmann :
>
> I took a first glance.
>
> I ran 2 test setups. One with a limited test data generator, the outputs
> around 200 events per second. In this setting the
I took a first glance.
I ran 2 test setups. One with a limited test data generator, the outputs around
200 events per second. In this setting the new implementation keeps up with the
incoming message rate.
The other setup had an unlimited generation (at highest possible rate). There
the same
Hi Rico,
you should be able to get it with these steps:
git clone https://github.com/StephanEwen/incubator-flink.git flink
cd flink
git checkout -t origin/windows
This will get you on Stephan's windowing branch. Then you can do a
mvn clean install -DskipTests
to build it.
I will merge his stuf
Hi!
Sounds great. How can I get the source code before it's merged to the master
branch? Unfortunately I only have 2 days left for trying this out ...
Greets. Rico.
> Am 24.09.2015 um 00:57 schrieb Stephan Ewen :
>
> Hi Rico!
>
> We have finished the first part of the Window API reworks. Y
Hi Rico!
We have finished the first part of the Window API reworks. You can find the
code here: https://github.com/apache/flink/pull/1175
It should fix the issues and offer vastly improved performance (up to 50x
faster). For now, it supports time windows, but we will support the other
cases in th
Btw, there was a discussion about this problem a while back:
https://mail-archives.apache.org/mod_mbox/flink-dev/201506.mbox/%3ccadxjeyci9_opro4oqtzhvi-gifek6_66ybtjz_pb0aqop_n...@mail.gmail.com%3E
And here is the jira:
https://issues.apache.org/jira/browse/FLINK-2181
Best,
Gabor
2015-09-09 10:0
Aljoscha and me are currently working on an alternative Windowing
implementation. That new implementation will support out-of-order event
time and release keys properly. We will hopefully have a first version to
try out in a week or so...
Greetings,
Stephan
On Wed, Sep 9, 2015 at 9:08 AM, Aljosc
Ok, that's a special case but the system still shouldn't behave that way.
The problem is that the grouped discretizer that is responsible for
grouping the elements into grouped windows is keeping state for every key
that it encounters. And that state is never released, ever. That's the
reason for t
Yes. The keys are constantly changing. Indeed each unique event has its own key
(the event itself). The purpose was to do an event deduplication ...
> Am 08.09.2015 um 20:05 schrieb Aljoscha Krettek :
>
> Hi Rico,
> I have a suspicion. What is the distribution of your keys? That is, are there
Hi Rico,
I have a suspicion. What is the distribution of your keys? That is, are
there many unique keys, do the keys keep evolving, i.e. is it always new
and different keys?
Cheers,
Aljoscha
On Tue, 8 Sep 2015 at 13:44 Rico Bergmann wrote:
> I also see in the TM overview the CPU load is still a
I also see in the TM overview the CPU load is still around 25% although there
is no input to the program since minutes. The CPU load is degrading very
slowly.
The memory consumption is still fluctuating at a high level. It does not
degrade.
In my test I generated test input for 1 minute. Now
The marksweep value is very high, the scavenge very low. If this helps ;-)
> Am 08.09.2015 um 11:27 schrieb Robert Metzger :
>
> It is in the "Information" column: http://i.imgur.com/rzxxURR.png
> In the screenshot, the two GCs only spend 84 and 25 ms.
>
>> On Tue, Sep 8, 2015 at 10:34 AM, Ri
It is in the "Information" column: http://i.imgur.com/rzxxURR.png
In the screenshot, the two GCs only spend 84 and 25 ms.
On Tue, Sep 8, 2015 at 10:34 AM, Rico Bergmann wrote:
> Where can I find these information? I can see the memory usage and cpu
> load. But where are the information on the GC
Where can I find these information? I can see the memory usage and cpu load.
But where are the information on the GC?
> Am 08.09.2015 um 09:34 schrieb Robert Metzger :
>
> The webinterface of Flink has a tab for the TaskManagers. There, you can also
> see how much time the JVM spend with garb
The webinterface of Flink has a tab for the TaskManagers. There, you can
also see how much time the JVM spend with garbage collection.
Can you check whether the number of GC calls + the time spend goes up after
30 minutes?
On Tue, Sep 8, 2015 at 8:37 AM, Rico Bergmann wrote:
> Hi!
>
> I also thi
Hi!
I also think it's a GC problem. In the KeySelector I don't instantiate any
object. It's a simple toString method call.
In the mapWindow I create new objects. But I'm doing the same in other map
operators, too. They don't slow down the execution. Only with this construct
the execution is sl
Hej,
This sounds like it could be a garbage collection problem. Do you
instantiate any classes inside any of the operators (e.g. in the
KeySelector). You can also try to run it locally and use something like
jstat to rule this out.
cheers Martin
On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann wr
Hi!
While working with grouping and windowing I encountered a strange behavior. I'm
doing:
> dataStream.groupBy(KeySelector).window(Time.of(x,
> TimeUnit.SECONDS)).mapWindow(toString).flatten()
When I run the program containing this snippet it initially outputs data at a
rate around 150 events
68 matches
Mail list logo