Hi Arvid, The throughput has decreased further after I removed all the rebalance(). The performance has decreased from 14 minutes for 20K messages to 20 minutes for 20K messages.
Below are the tasks that the flink application is performing. I am using keyBy and Window operation. Do you think am I making any mistake here or the way I am performing the keyBy or Window operation needs to be corrected?. //Add Source StreamExecutionEnvironment streamenv = StreamExecutionEnvironment.getExecutionEnvironment(); initialStreamData = streamenv.addSource(new FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC), new ObjectNodeJsonDeSerializerSchema(), kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value -> value.findValue("PERSON_ID").asText()) .flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE); DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new CGMKeySelector()).countWindow(2, 1) .apply(new ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance(); DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new CGMKeySelector()).countWindow(7, 1) .apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance(); DataStream<CGMDataCollector> cgmExcursionStream = streamWithSgRoc.keyBy(new CGMKeySelector()) .countWindow(Common.THREE, Common.ONE).apply(new CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance(); //Add Sink cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>( topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new CGMDataCollectorSchema(), kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); Implementation classes:- //deserialize the json message received ObjectNodeJsonDeSerializerSchema implements KeyedDeserializationSchema<ObjectNode>{ public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset); } //Flapmap to check each message and apply validation public class SgStreamingTask extends RichFlatMapFunction<ObjectNode, CGM> { void flatMap(ObjectNode streamData, Collector<CGM> out); } //persist three state variables and apply business logic public class ArtifactOverlapProvider extends RichFlatMapFunction<CGM, Tuple2<Long, Long>> implements WindowFunction<CGM, CGM, String, GlobalWindow> { public void apply(String key, GlobalWindow window, Iterable<CGM> values, Collector<CGM> out); } //Apply business logic public class SgRocProvider implements WindowFunction<CGM, CGM, String, GlobalWindow>{ public void apply(String key, GlobalWindow window, Iterable<CGM> values, Collector<CGM> out); } //persist 3 state variables and apply business logic public class CGMExcursionProviderStream extends RichFlatMapFunction<CGM, Tuple2<Long, Long>> implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{ public void apply(String key, GlobalWindow window, Iterable<CGM> values, Collector<CGMDataCollector> out); } Thanks Kamaal > On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote: > Hi Mohammed, > > something is definitely wrong in your setup. You can safely say that you can > process 1k records per second and core with Kafka and light processing, so > you shouldn't even need to go distributed in your case. > > Do you perform any heavy computation? What is your flatMap doing? Are you > emitting lots of small records from one big record? > > Can you please remove all rebalance and report back? Rebalance is > counter-productive if you don't exactly know that you need it. > >> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <mohammed.kamaa...@gmail.com> >> wrote: >> Hi Fabian, >> >> Just an update, >> >> Problem 2:- >> ---------------- >> Caused by: org.apache.kafka.common.errors.NetworkException >> It is resolved. It was because we exceeded the number of allowed >> partitions for the kafka cluster (AWS MSK cluster). Have deleted >> unused topics and partitions to resolve the issue. >> >> Problem 1:- >> ---------------- >> I increased the kafka partition and flink parallelism to 45 and the >> throughput has improved from 20 minutes to 14 minutes (20K records). >> Can you check the flink graph and let me know if there is anything >> else that can be done here to improve the throughput further. >> >> Thanks >> >> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal >> <mohammed.kamaa...@gmail.com> wrote: >> > >> > Hi Fabian, >> > >> > Problem 1:- >> > --------------------- >> > I have removed the print out sink's and ran the test again. This time >> > the throughput is 17 minutes for 20K records (200 records every >> > second). Earlier it was 20 minutes for 20K records. (parallelism 15 >> > and kafka partition of 15) >> > >> > Please find the attached application graph. Can you suggest what else >> > is required further to improve the throughput. >> > >> > Problem 2:- >> > --------------------- >> > Also, I tried to increase the parallelism to 45 from 15 (also >> > increasing the kafka partition to 45 from 15) to see if this helps in >> > getting a better throughput. >> > >> > After increasing the partition, I am facing the Network issue with >> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue >> > with 15 partitions for the kafka topic. This could be an issue with >> > the Kafka cluster? >> > >> > Kafka Cluster Configuration:- >> > --------------------------------------- >> > auto.create.topics.enable=true >> > log.retention.hours=24 >> > default.replication.factor=3 >> > min.insync.replicas=2 >> > num.io.threads=45 >> > num.network.threads=60 >> > num.partitions=45 >> > num.replica.fetchers=2 >> > unclean.leader.election.enable=true >> > replica.lag.time.max.ms=30000 >> > zookeeper.session.timeout.ms=18000 >> > log.retention.ms=172800000 >> > log.cleanup.policy=delete >> > group.max.session.timeout.ms=1200000 >> > >> > Exception:- >> > ---------------- >> > "locationInformation": >> > "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)", >> > "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask", >> > "message": "Error during disposal of stream operator.", >> > "throwableInformation": [ >> > "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: >> > Failed to send data to Kafka: Failed to send data to Kafka: The server >> > disconnected >> > >> > "Caused by: org.apache.kafka.common.errors.NetworkException: The >> > server disconnected before a response was received." >> > >> > >> > Thanks >> > >> > >> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fabianp...@ververica.com> >> > wrote: >> > > >> > > Hi Mohammed, >> > > >> > > 200records should definitely be doable. The first you can do is remove >> > > the print out Sink because they are increasing the load on your cluster >> > > due to the additional IO >> > > operation and secondly preventing Flink from fusing operators. >> > > I am interested to see the updated job graph after the removal of the >> > > print sinks. >> > > >> > > Best, >> > > Fabian