Hi Robert, I have removed all the business logic (keyBy and window) operator code and just had a source and sink to test it. The throughput is 20K messages in 2 minutes. It is a simple read from source (kafka topic) and write to sink (kafka topic). Don't you think 2 minutes is also not a better throughput for a simple read/write application?. Each message is 4 KB.
As I had mentioned in the previous email(s), I am using keyBy() and Window() to handle business logic. Do you think these operators would have a huge impact on the performance?. Or is it something to do with my Kafka cluster configuration or the older version of flink (1.8) that I am using in my application. Not sure if flink version 1.8 has a performance issue. Please let me know. Below is my kafka cluster configuration. auto.create.topics.enable=true log.retention.hours=24 default.replication.factor=3 min.insync.replicas=2 num.io.threads=45 num.network.threads=60 num.partitions=45 num.replica.fetchers=2 unclean.leader.election.enable=true replica.lag.time.max.ms=30000 zookeeper.session.timeout.ms=18000 log.retention.ms=172800000 log.cleanup.policy=delete group.max.session.timeout.ms=1200000 Thanks > On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger <rmetz...@apache.org> wrote: > Hi Kamaal, > > I would first suggest understanding the performance bottleneck, before > applying any optimizations. > > Idea 1: Are your CPUs fully utilized? > if yes, good, then scaling up will probably help > If not, then there's another inefficiency > > Idea 2: How fast can you get the data into your job, without any processing? > You can measure this by submitting a simple Flink job that just reads the > data and writes it to a discarding sink. Either disable the operator chaining > to get metrics for the records per second, or add a custom mapper in between > that measures the throughput. > Ideally you see here that you can read all your data in a few seconds, if > not, then there's a problem getting your data in. > > Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB, the > disk can dramatically slow you down) > Idea 4: Are you under high memory pressure, and your JVMs are spending most > of their cycles garbage collecting? > > My bet is you are not getting data into your cluster as fast as you think > (Idea 2) > > >> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal >> <mohammed.kamaa...@gmail.com> wrote: >> Hi Arvid, >> >> The throughput has decreased further after I removed all the rebalance(). >> The performance has decreased from 14 minutes for 20K messages to 20 minutes >> for 20K messages. >> >> Below are the tasks that the flink application is performing. I am using >> keyBy and Window operation. Do you think am I making any mistake here or the >> way I am performing the keyBy or Window operation needs to be corrected?. >> >> //Add Source >> StreamExecutionEnvironment streamenv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> initialStreamData = streamenv.addSource(new >> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC), >> new ObjectNodeJsonDeSerializerSchema(), >> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); >> >> DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value -> >> value.findValue("PERSON_ID").asText()) >> .flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE); >> >> DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new >> CGMKeySelector()).countWindow(2, 1) >> .apply(new >> ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance(); >> >> DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new >> CGMKeySelector()).countWindow(7, 1) >> .apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance(); >> >> DataStream<CGMDataCollector> cgmExcursionStream = streamWithSgRoc.keyBy(new >> CGMKeySelector()) >> .countWindow(Common.THREE, Common.ONE).apply(new >> CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance(); >> >> //Add Sink >> cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>( >> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new >> CGMDataCollectorSchema(), >> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); >> >> Implementation classes:- >> >> //deserialize the json message received >> ObjectNodeJsonDeSerializerSchema implements >> KeyedDeserializationSchema<ObjectNode>{ >> public ObjectNode deserialize(byte[] messageKey, byte[] message, String >> topic, int partition, long offset); >> } >> >> //Flapmap to check each message and apply validation >> public class SgStreamingTask extends RichFlatMapFunction<ObjectNode, CGM> { >> void flatMap(ObjectNode streamData, Collector<CGM> out); >> } >> >> //persist three state variables and apply business logic >> public class ArtifactOverlapProvider extends RichFlatMapFunction<CGM, >> Tuple2<Long, Long>> >> implements WindowFunction<CGM, CGM, String, GlobalWindow> { >> public void apply(String key, GlobalWindow window, Iterable<CGM> values, >> Collector<CGM> out); >> } >> >> //Apply business logic >> public class SgRocProvider implements WindowFunction<CGM, CGM, String, >> GlobalWindow>{ >> public void apply(String key, GlobalWindow window, Iterable<CGM> values, >> Collector<CGM> out); >> } >> >> //persist 3 state variables and apply business logic >> public class CGMExcursionProviderStream extends RichFlatMapFunction<CGM, >> Tuple2<Long, Long>> >> implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{ >> public void apply(String key, GlobalWindow window, Iterable<CGM> values, >> Collector<CGMDataCollector> out); >> >> } >> >> Thanks >> Kamaal >> >> >>> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote: >>> Hi Mohammed, >>> >>> something is definitely wrong in your setup. You can safely say that you >>> can process 1k records per second and core with Kafka and light processing, >>> so you shouldn't even need to go distributed in your case. >>> >>> Do you perform any heavy computation? What is your flatMap doing? Are you >>> emitting lots of small records from one big record? >>> >>> Can you please remove all rebalance and report back? Rebalance is >>> counter-productive if you don't exactly know that you need it. >>> >>>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal >>>> <mohammed.kamaa...@gmail.com> wrote: >>>> Hi Fabian, >>>> >>>> Just an update, >>>> >>>> Problem 2:- >>>> ---------------- >>>> Caused by: org.apache.kafka.common.errors.NetworkException >>>> It is resolved. It was because we exceeded the number of allowed >>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted >>>> unused topics and partitions to resolve the issue. >>>> >>>> Problem 1:- >>>> ---------------- >>>> I increased the kafka partition and flink parallelism to 45 and the >>>> throughput has improved from 20 minutes to 14 minutes (20K records). >>>> Can you check the flink graph and let me know if there is anything >>>> else that can be done here to improve the throughput further. >>>> >>>> Thanks >>>> >>>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal >>>> <mohammed.kamaa...@gmail.com> wrote: >>>> > >>>> > Hi Fabian, >>>> > >>>> > Problem 1:- >>>> > --------------------- >>>> > I have removed the print out sink's and ran the test again. This time >>>> > the throughput is 17 minutes for 20K records (200 records every >>>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15 >>>> > and kafka partition of 15) >>>> > >>>> > Please find the attached application graph. Can you suggest what else >>>> > is required further to improve the throughput. >>>> > >>>> > Problem 2:- >>>> > --------------------- >>>> > Also, I tried to increase the parallelism to 45 from 15 (also >>>> > increasing the kafka partition to 45 from 15) to see if this helps in >>>> > getting a better throughput. >>>> > >>>> > After increasing the partition, I am facing the Network issue with >>>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue >>>> > with 15 partitions for the kafka topic. This could be an issue with >>>> > the Kafka cluster? >>>> > >>>> > Kafka Cluster Configuration:- >>>> > --------------------------------------- >>>> > auto.create.topics.enable=true >>>> > log.retention.hours=24 >>>> > default.replication.factor=3 >>>> > min.insync.replicas=2 >>>> > num.io.threads=45 >>>> > num.network.threads=60 >>>> > num.partitions=45 >>>> > num.replica.fetchers=2 >>>> > unclean.leader.election.enable=true >>>> > replica.lag.time.max.ms=30000 >>>> > zookeeper.session.timeout.ms=18000 >>>> > log.retention.ms=172800000 >>>> > log.cleanup.policy=delete >>>> > group.max.session.timeout.ms=1200000 >>>> > >>>> > Exception:- >>>> > ---------------- >>>> > "locationInformation": >>>> > "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)", >>>> > "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask", >>>> > "message": "Error during disposal of stream operator.", >>>> > "throwableInformation": [ >>>> > "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: >>>> > Failed to send data to Kafka: Failed to send data to Kafka: The server >>>> > disconnected >>>> > >>>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The >>>> > server disconnected before a response was received." >>>> > >>>> > >>>> > Thanks >>>> > >>>> > >>>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fabianp...@ververica.com> >>>> > wrote: >>>> > > >>>> > > Hi Mohammed, >>>> > > >>>> > > 200records should definitely be doable. The first you can do is remove >>>> > > the print out Sink because they are increasing the load on your >>>> > > cluster due to the additional IO >>>> > > operation and secondly preventing Flink from fusing operators. >>>> > > I am interested to see the updated job graph after the removal of the >>>> > > print sinks. >>>> > > >>>> > > Best, >>>> > > Fabian