Hi Fabian,

Just an update,

Problem 2:-
----------------
Caused by: org.apache.kafka.common.errors.NetworkException
It is resolved. It was because we exceeded the number of allowed
partitions for the kafka cluster (AWS MSK cluster). Have deleted
unused topics and partitions to resolve the issue.

Problem 1:-
----------------
I increased the kafka partition and flink parallelism to 45 and the
throughput has improved from 20 minutes to 14 minutes (20K records).
Can you check the flink graph and let me know if there is anything
else that can be done here to improve the throughput further.

Thanks

On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
<mohammed.kamaa...@gmail.com> wrote:
>
> Hi Fabian,
>
> Problem 1:-
> ---------------------
> I have removed the print out sink's and ran the test again. This time
> the throughput is 17 minutes for 20K records (200 records every
> second). Earlier it was 20 minutes for 20K records. (parallelism 15
> and kafka partition of 15)
>
> Please find the attached application graph. Can you suggest what else
> is required further to improve the throughput.
>
> Problem 2:-
> ---------------------
> Also, I tried to increase the parallelism to 45 from 15 (also
> increasing the kafka partition to 45 from 15) to see if this helps in
> getting a better throughput.
>
> After increasing the partition, I am facing the Network issue with
> Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
> with 15 partitions for the kafka topic. This could be an issue with
> the Kafka cluster?
>
> Kafka Cluster Configuration:-
> ---------------------------------------
> auto.create.topics.enable=true
> log.retention.hours=24
> default.replication.factor=3
> min.insync.replicas=2
> num.io.threads=45
> num.network.threads=60
> num.partitions=45
> num.replica.fetchers=2
> unclean.leader.election.enable=true
> replica.lag.time.max.ms=30000
> zookeeper.session.timeout.ms=18000
> log.retention.ms=172800000
> log.cleanup.policy=delete
> group.max.session.timeout.ms=1200000
>
> Exception:-
> ----------------
>  "locationInformation":
> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>     "message": "Error during disposal of stream operator.",
>     "throwableInformation": [
>         "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: Failed to send data to Kafka: The server
> disconnected
>
> "Caused by: org.apache.kafka.common.errors.NetworkException: The
> server disconnected before a response was received."
>
>
> Thanks
>
>
> On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fabianp...@ververica.com> wrote:
> >
> > Hi Mohammed,
> >
> > 200records should definitely be doable. The first you can do is remove the 
> > print out Sink because they are increasing the load on your cluster due to 
> > the additional IO
> > operation and secondly preventing Flink from fusing operators.
> > I am interested to see the updated job graph after the removal of the print 
> > sinks.
> >
> > Best,
> > Fabian

Reply via email to