Hi Mohammed, something is definitely wrong in your setup. You can safely say that you can process 1k records per second and core with Kafka and light processing, so you shouldn't even need to go distributed in your case.
Do you perform any heavy computation? What is your flatMap doing? Are you emitting lots of small records from one big record? Can you please remove all rebalance and report back? Rebalance is counter-productive if you don't exactly know that you need it. On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <mohammed.kamaa...@gmail.com> wrote: > Hi Fabian, > > Just an update, > > Problem 2:- > ---------------- > Caused by: org.apache.kafka.common.errors.NetworkException > It is resolved. It was because we exceeded the number of allowed > partitions for the kafka cluster (AWS MSK cluster). Have deleted > unused topics and partitions to resolve the issue. > > Problem 1:- > ---------------- > I increased the kafka partition and flink parallelism to 45 and the > throughput has improved from 20 minutes to 14 minutes (20K records). > Can you check the flink graph and let me know if there is anything > else that can be done here to improve the throughput further. > > Thanks > > On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal > <mohammed.kamaa...@gmail.com> wrote: > > > > Hi Fabian, > > > > Problem 1:- > > --------------------- > > I have removed the print out sink's and ran the test again. This time > > the throughput is 17 minutes for 20K records (200 records every > > second). Earlier it was 20 minutes for 20K records. (parallelism 15 > > and kafka partition of 15) > > > > Please find the attached application graph. Can you suggest what else > > is required further to improve the throughput. > > > > Problem 2:- > > --------------------- > > Also, I tried to increase the parallelism to 45 from 15 (also > > increasing the kafka partition to 45 from 15) to see if this helps in > > getting a better throughput. > > > > After increasing the partition, I am facing the Network issue with > > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue > > with 15 partitions for the kafka topic. This could be an issue with > > the Kafka cluster? > > > > Kafka Cluster Configuration:- > > --------------------------------------- > > auto.create.topics.enable=true > > log.retention.hours=24 > > default.replication.factor=3 > > min.insync.replicas=2 > > num.io.threads=45 > > num.network.threads=60 > > num.partitions=45 > > num.replica.fetchers=2 > > unclean.leader.election.enable=true > > replica.lag.time.max.ms=30000 > > zookeeper.session.timeout.ms=18000 > > log.retention.ms=172800000 > > log.cleanup.policy=delete > > group.max.session.timeout.ms=1200000 > > > > Exception:- > > ---------------- > > "locationInformation": > > > "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)", > > "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask", > > "message": "Error during disposal of stream operator.", > > "throwableInformation": [ > > "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: > > Failed to send data to Kafka: Failed to send data to Kafka: The server > > disconnected > > > > "Caused by: org.apache.kafka.common.errors.NetworkException: The > > server disconnected before a response was received." > > > > > > Thanks > > > > > > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fabianp...@ververica.com> > wrote: > > > > > > Hi Mohammed, > > > > > > 200records should definitely be doable. The first you can do is remove > the print out Sink because they are increasing the load on your cluster due > to the additional IO > > > operation and secondly preventing Flink from fusing operators. > > > I am interested to see the updated job graph after the removal of the > print sinks. > > > > > > Best, > > > Fabian >