Flink Kafka Consumer performance issue

2022-10-02 Thread Xin Ma
Hi, (flink version 1.14.2, kafka version 2.6.1) I have a flink job consuming kafka and simply sinking the data into s3. The kafka consumer is sometimes delayed on a few partitions. The partitions are evenly registered by flink subtasks. I found there was a correlation between kafka consumer fet

Re: Flink Performance Issue

2021-09-27 Thread Arvid Heise
business logic. Do you think these operators would have > a huge impact on the performance?. Or is it something to do with my Kafka > cluster configuration or the older version of flink (1.8) that I am using > in my application. Not sure if flink version 1.8 has a performance issue. >

Re: Flink Performance Issue

2021-09-27 Thread Mohammed Kamaal
Kafka cluster configuration or the older version of flink (1.8) that I am using in my application. Not sure if flink version 1.8 has a performance issue. Please let me know. Below is my kafka cluster configuration. auto.create.topics.enable=true log.retention.hours=24 default.replication.fac

Re: Flink Performance Issue

2021-09-22 Thread Robert Metzger
Hi Kamaal, I would first suggest understanding the performance bottleneck, before applying any optimizations. Idea 1: Are your CPUs fully utilized? if yes, good, then scaling up will probably help If not, then there's another inefficiency Idea 2: How fast can you get the data into your job, with

Re: Flink Performance Issue

2021-09-22 Thread Mohammed Kamaal
Hi Arvid, The throughput has decreased further after I removed all the rebalance(). The performance has decreased from 14 minutes for 20K messages to 20 minutes for 20K messages. Below are the tasks that the flink application is performing. I am using keyBy and Window operation. Do you think a

Re: Flink Performance Issue

2021-09-06 Thread Arvid Heise
Hi Mohammed, something is definitely wrong in your setup. You can safely say that you can process 1k records per second and core with Kafka and light processing, so you shouldn't even need to go distributed in your case. Do you perform any heavy computation? What is your flatMap doing? Are you em

Re: Flink Performance Issue

2021-09-02 Thread Mohammed Kamaal
Hi Fabian, Just an update, Problem 2:- Caused by: org.apache.kafka.common.errors.NetworkException It is resolved. It was because we exceeded the number of allowed partitions for the kafka cluster (AWS MSK cluster). Have deleted unused topics and partitions to resolve the issue.

Re: Flink Performance Issue

2021-08-24 Thread Fabian Paul
Hi Mohammed, 200records should definitely be doable. The first you can do is remove the print out Sink because they are increasing the load on your cluster due to the additional IO operation and secondly preventing Flink from fusing operators. I am interested to see the updated job graph after

Re: Flink Performance Issue

2021-08-24 Thread Fabian Paul
Hi Mohammed, Without diving too much into your business logic a thing which catches my eye is the partitiong you are using. In general all calls to`keyBy`or `rebalance` are very expensive because all the data is shuffled across down- stream tasks. Flink tries to fuse operators with the same keyG

Flink Performance Issue

2021-08-24 Thread Mohammed Kamaal
Hi, Apologize for the big message, to explain the issue in detail. We have a Flink (version 1.8) application running on AWS Kinesis Analytics. The application has a source which is a kafka topic with 15 partitions (AWS Managed Streaming Kafka) and the sink is again a kafka topic with 15 partiti

Re: Performance issue associated with managed RocksDB memory

2020-09-15 Thread Yu Li
> > Regards, > Juha > -- > *From:* Yun Tang > *Sent:* Tuesday, September 15, 2020 8:06 AM > *To:* Juha Mynttinen ; Stephan Ewen < > se...@apache.org> > *Cc:* user@flink.apache.org > *Subject:* Re: Performance issue associated with managed RocksDB memory

Re: Performance issue associated with managed RocksDB memory

2020-09-15 Thread Juha Mynttinen
Hey I created this one https://issues.apache.org/jira/browse/FLINK-19238. Regards, Juha From: Yun Tang Sent: Tuesday, September 15, 2020 8:06 AM To: Juha Mynttinen ; Stephan Ewen Cc: user@flink.apache.org Subject: Re: Performance issue associated with managed

Re: Performance issue associated with managed RocksDB memory

2020-09-14 Thread Yun Tang
@flink.apache.org Subject: Re: Performance issue associated with managed RocksDB memory Hey I've fixed the code (https://github.com/juha-mynttinen-king/flink/commits/arena_block_sanity_check) slightly. Now it WARNs if there is the memory configuration issue. Also, I think there was a b

Re: Performance issue associated with managed RocksDB memory

2020-09-10 Thread Juha Mynttinen
Subject: Re: Performance issue associated with managed RocksDB memory Hey Juha! I agree that we cannot reasonably expect from the majority of users to understand block sizes, area sizes, etc to get their application running. So the default should be "inform when there is a problem and s

Re: Performance issue associated with managed RocksDB memory

2020-09-09 Thread Stephan Ewen
it's hard/impossible to know what kind of performance one can expect > from a Flink application. Thus, it's hard to know if one is suffering from > e.g. from this performance issue, or if the system is performing normally > (and inherently being slow). > 2) even if one susp

Re: Performance issue associated with managed RocksDB memory

2020-09-09 Thread Juha Mynttinen
d/impossible to know what kind of performance one can expect from a Flink application. Thus, it's hard to know if one is suffering from e.g. from this performance issue, or if the system is performing normally (and inherently being slow). 2) even if one suspects a performance issue, it'

Re: Performance issue associated with managed RocksDB memory

2020-09-09 Thread Stephan Ewen
Tuesday, September 8, 2020 20:56 > *To:* Yun Tang ; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Performance issue associated with managed RocksDB memory > > Hey Yun, > > Thanks for the detailed answer. It clarified how things work. Especially > what is the rol

Re: Performance issue associated with managed RocksDB memory

2020-09-08 Thread Yun Tang
From: Juha Mynttinen Sent: Tuesday, September 8, 2020 20:56 To: Yun Tang ; user@flink.apache.org Subject: Re: Performance issue associated with managed RocksDB memory Hey Yun, Thanks for the detailed answer. It clarified how things work. Especially what is the role of RocksDB arena, and arena

Re: Performance issue associated with managed RocksDB memory

2020-09-08 Thread Juha Mynttinen
ecrease parallelism (if possible), 3) increase managed memory" Regards, Juha From: Yun Tang Sent: Friday, August 28, 2020 6:58 AM To: Juha Mynttinen ; user@flink.apache.org Subject: Re: Performance issue associated with managed RocksDB memory Hi Juha Thank

Re: Performance issue associated with managed RocksDB memory

2020-08-27 Thread Yun Tang
15:56 To: user@flink.apache.org Subject: Re: Performance issue associated with managed RocksDB memory The issue can be reproduced by using a certain combinations of the value of RocksDBOptions.WRITE_BUFFER_RATIO (default 0.5) and the Flink job parallelism. Examples that break: * Parallelism 1 and WRIT

Re: Performance issue associated with managed RocksDB memory

2020-08-24 Thread Juha Mynttinen
The issue can be reproduced by using a certain combinations of the value of RocksDBOptions.WRITE_BUFFER_RATIO (default 0.5) and the Flink job parallelism. Examples that break: * Parallelism 1 and WRITE_BUFFER_RATIO 0.1 * Parallelism 5 and the default WRITE_BUFFER_RATIO 0.5 Examples that work: * P

Re: Performance issue associated with managed RocksDB memory

2020-06-26 Thread Yu Li
To clarify, that my questions were all against the very original issue instead of the WordCount job. The timers come from the window operator you mentioned as the source of the original issue: === bq. If I create a Flink job that has a single "heavy" operator

Re: Performance issue associated with managed RocksDB memory

2020-06-26 Thread Andrey Zagrebin
Hi Juha, > I can also submit the more complex test with the bigger operator and and a > window operator. There's just gonna be more code to read. Can I attach a > file here or how should I submit a larger chuck of code? You can just attach the file with the code. > 2. I'm not sure what would / s

Re: Performance issue associated with managed RocksDB memory

2020-06-25 Thread Juha Mynttinen
Andrey, A small clarification. The tweaked WordCount I posted earlier doesn't illustrate the issue I originally explained, i.e. the one where there's a bigger operator and a smallest possible windows operator. Instead, the modified WordCount illustrates the degraded performance of a very simple Fl

Re: Performance issue associated with managed RocksDB memory

2020-06-25 Thread Juha Mynttinen
PM To: Andrey Zagrebin Cc: Juha Mynttinen ; Yun Tang ; user Subject: Re: Performance issue associated with managed RocksDB memory Thanks for the ping Andrey. Hi Juha, Thanks for reporting the issue. I'd like to check the below things before further digging into it: 1. Could you let us

Re: Performance issue associated with managed RocksDB memory

2020-06-25 Thread Yu Li
Thanks for the ping Andrey. Hi Juha, Thanks for reporting the issue. I'd like to check the below things before further digging into it: 1. Could you let us know your configurations (especially memory related ones) when running the tests? 2. Did you watch the memory consumption before / after tu

Re: Performance issue associated with managed RocksDB memory

2020-06-25 Thread Andrey Zagrebin
Hi Juha, Thanks for sharing the testing program to expose the problem. This indeed looks suboptimal if X does not leave space for the window operator. I am adding Yu and Yun who might have a better idea about what could be improved about sharing the RocksDB memory among operators. Best, Andrey O

Re: Performance issue associated with managed RocksDB memory

2020-06-24 Thread Juha Mynttinen
Hey, Here's a simple test. It's basically the WordCount example from Flink, but using RocksDB as the state backend and having a stateful operator. The javadocs explain how to use it. /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See

Performance issue associated with managed RocksDB memory

2020-06-24 Thread Juha Mynttinen
Hello there, In Flink 1.10 the configuration parameter state.backend.rocksdb.memory.managed defaults to true. This is great, since it makes it simpler to stay within the memory budget e.g. when running in a container environment. However, I've noticed performance issues when the switch is enabl

Re: Performance issue when writing to HDFS

2020-05-25 Thread Mu Kong
td-p/122046 >[2] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/quot-Slow-ReadProcessor-quot-warnings-when-using-BucketSink-td9427.html > > > --原始邮件 -- > *发件人:*Mu Kong > *发送时间:*Fri May 22 11:16:32 2020 > *收件人:*user

回复:Performance issue when writing to HDFS

2020-05-21 Thread Yun Gao
-原始邮件 -- 发件人:Mu Kong 发送时间:Fri May 22 11:16:32 2020 收件人:user 主题:Performance issue when writing to HDFS Hi all, I have Flink application consuming from Kafka and writing the data to HDFS bucketed by event time with BucketingSink. Sometimes, the the traffic gets high and from the prome

Re: Performance issue with RegistryAvroSerializationSchema

2020-02-07 Thread Robert Metzger
Steve, thanks a lot for looking into this closer! Let's discuss the resolution of the issue in the ticket Dawid has created: https://issues.apache.org/jira/browse/FLINK-15941 Best, Robert On Thu, Feb 6, 2020 at 6:59 PM Steve Whelan wrote: > Robert, > > You are correct that it is using a *Cache

Re: Performance issue with RegistryAvroSerializationSchema

2020-02-06 Thread Steve Whelan
Robert, You are correct that it is using a *CachedSchemaRegistryClient* object. Therefore, *schemaRegistryClient.*register() should be checking the cache first before sending a request to the Registry. However, turning on debug logging of my Registry, I can see a request being sent for every seria

Re: Performance issue with RegistryAvroSerializationSchema

2020-02-06 Thread Dawid Wysakowicz
Hi Steve, I think your observation is correct. If I am not mistaken we should use *schemaRegistryClient.getId(subject, schema); *instead of** **schemaRegistryClient.register(subject, schema);. **The former should perform an http request only if the schema is not in the cache. I created an issue t

Re: Performance issue with RegistryAvroSerializationSchema

2020-02-06 Thread Robert Metzger
Hi, thanks a lot for your message. It's certainly not intentional to do a HTTP request for every single message :) Isn't the *schemaRegistryClient *an instance of CachedSchemaRegistryClient, which, as the name says, caches? Can you check with a debugger at runtime what registry client is used, and

Performance issue with RegistryAvroSerializationSchema

2020-02-03 Thread Steve Whelan
Hi, I'm running Flink v1.9. I backported the commit adding serialization support for Confluent's schema registry[1]. Using the code as is, I saw a nearly 50% drop in peak throughput for my job compared to using *AvroRowSerializationSchema*. Looking at the code, *RegistryAvroSerializationSchema.se

Re: Union of streams performance issue (10x)

2019-07-23 Thread Fabian Hueske
Hi Peter, The performance drops probably be due to de/serialization. When tasks are chained, records are simply forwarded as Java objects via method calls. When a task chain in broken into multiple operators, the records (Java objects) are serialized by the sending task, possibly shipped over the

Union of streams performance issue (10x)

2019-07-13 Thread Peter Zende
Hi all We have a pipeline (runs on YARN, Flink v1.7.1) which consumes a union of Kafka and HDFS sources. We remarked that the throughput is 10 times higher if only one of these sources is consumed. While trying to identify the problem I implemented a no-op source which was unioned with one of the

Re: Flink/Kafka POC performance issue

2018-04-17 Thread TechnoMage
Also, I note that none of the operations show any back pressure issues, and the records out from the kafka connector slow down to a crawl. Are there any known issues with kafka throughput that could be the issue rather than flink? I have a java program that monitors the test that reads all the

Re: Flink/Kafka POC performance issue

2018-04-17 Thread TechnoMage
Also, I note some messages in the log about my java class not being a valid POJO because it is missing accessors for a field. Would this impact performance significantly? Michael > On Apr 17, 2018, at 12:54 PM, TechnoMage wrote: > > No checkpoints are active. > I will try that back end. > Ye

Re: Flink/Kafka POC performance issue

2018-04-17 Thread TechnoMage
No checkpoints are active. I will try that back end. Yes, using JSONObject subclass for most of the intermediate state, with JSON strings in and out of Kafka. I will look at the config page for how to enable that. Thank you, Michael > On Apr 17, 2018, at 12:51 PM, Stephan Ewen wrote: > > A f

Re: Flink/Kafka POC performance issue

2018-04-17 Thread Stephan Ewen
A few ideas how to start debugging this: - Try deactivating checkpoints. Without that, no work goes into persisting rocksdb data to the checkpoint store. - Try to swap RocksDB for the FsStateBackend - that reduces serialization cost for moving data between heap and offheap (rocksdb). - Do yo

Re: Flink/Kafka POC performance issue

2018-04-17 Thread TechnoMage
Memory use is steady throughout the job, but the CPU utilization drops off a cliff. I assume this is because it becomes I/O bound shuffling managed state. Are there any metrics on managed state that can help in evaluating what to do next? Michael > On Apr 17, 2018, at 7:11 AM, Michael Latta

Re: Flink/Kafka POC performance issue

2018-04-17 Thread Michael Latta
Thanks for the suggestion. The task manager is configured for 8GB of heap, and gets to about 8.3 total. Other java processes (job manager and Kafka). Add a few more. I will check it again but the instances have 16GB same as my laptop that completes the test in <90 min. Michael Sent from my iP

Re: Flink/Kafka POC performance issue

2018-04-16 Thread Niclas Hedhman
Have you checked memory usage? It could be as simple as either having memory leaks, or aggregating more than you think (sometimes not obvious how much is kept around in memory for longer than one first thinks). If possible, connect FlightRecorder or similar tool and keep an eye on memory. Additiona

Flink/Kafka POC performance issue

2018-04-16 Thread TechnoMage
I am doing a short Proof of Concept for using Flink and Kafka in our product. On my laptop I can process 10M inputs in about 90 min. On 2 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see the process hit a wall around 50min into the test and short of 7

Re: Performance Issue

2015-09-24 Thread Stephan Ewen
Makes sense. The generation process seems to be inherently faster than the consumption process (Flink program). Without backpressure, these two will run out of sync, and Kafka does not do any backpressure (by design). On Thu, Sep 24, 2015 at 4:51 PM, Rico Bergmann wrote: > The test data is gene

Re: Performance Issue

2015-09-24 Thread Rico Bergmann
The test data is generated in a flink program running in a separate jvm. The generated data is then written to a Kafka topic from which my programs reads the data ... > Am 24.09.2015 um 14:53 schrieb Aljoscha Krettek : > > Hi Rico, > are you generating the data directly in your flink program

Re: Performance Issue

2015-09-24 Thread Stephan Ewen
Hi Rico! When you say that the program falls behind the unlimited generating source, I assume you have some unbounded buffering channel (like Kafka) between the generator and the Flink job. Is that correct? Flink itself backpressures to the sources, but if the source is Kafka, this does of course

Re: Performance Issue

2015-09-24 Thread Aljoscha Krettek
Hi Rico, are you generating the data directly in your flink program or some external queue, such as Kafka? Cheers, Aljoscha On Thu, 24 Sep 2015 at 13:47 Rico Bergmann wrote: > And as side note: > > The problem with duplicates seems also to be solved! > > Cheers Rico. > > > > Am 24.09.2015 um 12

Re: Performance Issue

2015-09-24 Thread Rico Bergmann
And as side note: The problem with duplicates seems also to be solved! Cheers Rico. > Am 24.09.2015 um 12:21 schrieb Rico Bergmann : > > I took a first glance. > > I ran 2 test setups. One with a limited test data generator, the outputs > around 200 events per second. In this setting the

Re: Performance Issue

2015-09-24 Thread Rico Bergmann
I took a first glance. I ran 2 test setups. One with a limited test data generator, the outputs around 200 events per second. In this setting the new implementation keeps up with the incoming message rate. The other setup had an unlimited generation (at highest possible rate). There the same

Re: Performance Issue

2015-09-24 Thread Aljoscha Krettek
Hi Rico, you should be able to get it with these steps: git clone https://github.com/StephanEwen/incubator-flink.git flink cd flink git checkout -t origin/windows This will get you on Stephan's windowing branch. Then you can do a mvn clean install -DskipTests to build it. I will merge his stuf

Re: Performance Issue

2015-09-24 Thread Rico Bergmann
Hi! Sounds great. How can I get the source code before it's merged to the master branch? Unfortunately I only have 2 days left for trying this out ... Greets. Rico. > Am 24.09.2015 um 00:57 schrieb Stephan Ewen : > > Hi Rico! > > We have finished the first part of the Window API reworks. Y

Re: Performance Issue

2015-09-23 Thread Stephan Ewen
Hi Rico! We have finished the first part of the Window API reworks. You can find the code here: https://github.com/apache/flink/pull/1175 It should fix the issues and offer vastly improved performance (up to 50x faster). For now, it supports time windows, but we will support the other cases in th

Re: Performance Issue

2015-09-09 Thread Gábor Gévay
Btw, there was a discussion about this problem a while back: https://mail-archives.apache.org/mod_mbox/flink-dev/201506.mbox/%3ccadxjeyci9_opro4oqtzhvi-gifek6_66ybtjz_pb0aqop_n...@mail.gmail.com%3E And here is the jira: https://issues.apache.org/jira/browse/FLINK-2181 Best, Gabor 2015-09-09 10:0

Re: Performance Issue

2015-09-09 Thread Stephan Ewen
Aljoscha and me are currently working on an alternative Windowing implementation. That new implementation will support out-of-order event time and release keys properly. We will hopefully have a first version to try out in a week or so... Greetings, Stephan On Wed, Sep 9, 2015 at 9:08 AM, Aljosc

Re: Performance Issue

2015-09-09 Thread Aljoscha Krettek
Ok, that's a special case but the system still shouldn't behave that way. The problem is that the grouped discretizer that is responsible for grouping the elements into grouped windows is keeping state for every key that it encounters. And that state is never released, ever. That's the reason for t

Re: Performance Issue

2015-09-08 Thread Rico Bergmann
Yes. The keys are constantly changing. Indeed each unique event has its own key (the event itself). The purpose was to do an event deduplication ... > Am 08.09.2015 um 20:05 schrieb Aljoscha Krettek : > > Hi Rico, > I have a suspicion. What is the distribution of your keys? That is, are there

Re: Performance Issue

2015-09-08 Thread Aljoscha Krettek
Hi Rico, I have a suspicion. What is the distribution of your keys? That is, are there many unique keys, do the keys keep evolving, i.e. is it always new and different keys? Cheers, Aljoscha On Tue, 8 Sep 2015 at 13:44 Rico Bergmann wrote: > I also see in the TM overview the CPU load is still a

Re: Performance Issue

2015-09-08 Thread Rico Bergmann
I also see in the TM overview the CPU load is still around 25% although there is no input to the program since minutes. The CPU load is degrading very slowly. The memory consumption is still fluctuating at a high level. It does not degrade. In my test I generated test input for 1 minute. Now

Re: Performance Issue

2015-09-08 Thread Rico Bergmann
The marksweep value is very high, the scavenge very low. If this helps ;-) > Am 08.09.2015 um 11:27 schrieb Robert Metzger : > > It is in the "Information" column: http://i.imgur.com/rzxxURR.png > In the screenshot, the two GCs only spend 84 and 25 ms. > >> On Tue, Sep 8, 2015 at 10:34 AM, Ri

Re: Performance Issue

2015-09-08 Thread Robert Metzger
It is in the "Information" column: http://i.imgur.com/rzxxURR.png In the screenshot, the two GCs only spend 84 and 25 ms. On Tue, Sep 8, 2015 at 10:34 AM, Rico Bergmann wrote: > Where can I find these information? I can see the memory usage and cpu > load. But where are the information on the GC

Re: Performance Issue

2015-09-08 Thread Rico Bergmann
Where can I find these information? I can see the memory usage and cpu load. But where are the information on the GC? > Am 08.09.2015 um 09:34 schrieb Robert Metzger : > > The webinterface of Flink has a tab for the TaskManagers. There, you can also > see how much time the JVM spend with garb

Re: Performance Issue

2015-09-08 Thread Robert Metzger
The webinterface of Flink has a tab for the TaskManagers. There, you can also see how much time the JVM spend with garbage collection. Can you check whether the number of GC calls + the time spend goes up after 30 minutes? On Tue, Sep 8, 2015 at 8:37 AM, Rico Bergmann wrote: > Hi! > > I also thi

Re: Performance Issue

2015-09-07 Thread Rico Bergmann
Hi! I also think it's a GC problem. In the KeySelector I don't instantiate any object. It's a simple toString method call. In the mapWindow I create new objects. But I'm doing the same in other map operators, too. They don't slow down the execution. Only with this construct the execution is sl

Re: Performance Issue

2015-09-07 Thread Martin Neumann
Hej, This sounds like it could be a garbage collection problem. Do you instantiate any classes inside any of the operators (e.g. in the KeySelector). You can also try to run it locally and use something like jstat to rule this out. cheers Martin On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann wr

Performance Issue

2015-09-07 Thread Rico Bergmann
Hi! While working with grouping and windowing I encountered a strange behavior. I'm doing: > dataStream.groupBy(KeySelector).window(Time.of(x, > TimeUnit.SECONDS)).mapWindow(toString).flatten() When I run the program containing this snippet it initially outputs data at a rate around 150 events