Hi Kamaal, I did a quick test with a local Kafka in docker. With parallelism 1, I can process 20k messages of size 4KB in about 1 min. So if you use parallelism of 15, I'd expect it to take it below 10s even with bigger data skew.
What I recommend you to do is to start from scratch and just work with a simple source -> sink. That should be much much faster. If so, then you can add complexity until you find the bottleneck. If not, I suspect your ObjectNodeJsonDeSerializerSchema to be the issue. For example, are you creating an ObjectMapper with each invocation? That's a typical mistake. Best, Arvid On Mon, Sep 27, 2021 at 2:38 PM Mohammed Kamaal <mohammed.kamaa...@gmail.com> wrote: > Hi Robert, > > I have removed all the business logic (keyBy and window) operator code and > just had a source and sink to test it. > The throughput is 20K messages in 2 minutes. It is a simple read from > source (kafka topic) and write to sink (kafka topic). Don't you think 2 > minutes is also not a better throughput for a simple read/write > application?. Each message is 4 KB. > > As I had mentioned in the previous email(s), I am using keyBy() and > Window() to handle business logic. Do you think these operators would have > a huge impact on the performance?. Or is it something to do with my Kafka > cluster configuration or the older version of flink (1.8) that I am using > in my application. Not sure if flink version 1.8 has a performance issue. > > Please let me know. > Below is my kafka cluster configuration. > > auto.create.topics.enable=true > log.retention.hours=24 > default.replication.factor=3 > min.insync.replicas=2 > num.io.threads=45 > num.network.threads=60 > num.partitions=45 > num.replica.fetchers=2 > unclean.leader.election.enable=true > replica.lag.time.max.ms=30000 > zookeeper.session.timeout.ms=18000 > log.retention.ms=172800000 > log.cleanup.policy=delete > group.max.session.timeout.ms=1200000 > > > > Thanks > > On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Kamaal, >> >> I would first suggest understanding the performance bottleneck, before >> applying any optimizations. >> >> Idea 1: Are your CPUs fully utilized? >> if yes, good, then scaling up will probably help >> If not, then there's another inefficiency >> >> Idea 2: How fast can you get the data into your job, without any >> processing? >> You can measure this by submitting a simple Flink job that just reads the >> data and writes it to a discarding sink. Either disable the operator >> chaining to get metrics for the records per second, or add a custom mapper >> in between that measures the throughput. >> Ideally you see here that you can read all your data in a few seconds, if >> not, then there's a problem getting your data in. >> >> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB, >> the disk can dramatically slow you down) >> Idea 4: Are you under high memory pressure, and your JVMs are spending >> most of their cycles garbage collecting? >> >> My bet is you are not getting data into your cluster as fast as you think >> (Idea 2) >> >> >> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal < >> mohammed.kamaa...@gmail.com> wrote: >> >>> Hi Arvid, >>> >>> The throughput has decreased further after I removed all the >>> rebalance(). The performance has decreased from 14 minutes for 20K messages >>> to 20 minutes for 20K messages. >>> >>> Below are the tasks that the flink application is performing. I am using >>> keyBy and Window operation. Do you think am I making any mistake here or >>> the way I am performing the keyBy or Window operation needs to be >>> corrected?. >>> >>> //Add Source >>> StreamExecutionEnvironment streamenv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> initialStreamData = streamenv.addSource(new >>> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC), >>> new *ObjectNodeJsonDeSerializerSchema()*, >>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); >>> >>> DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value -> >>> value.findValue("PERSON_ID").asText()) >>> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE); >>> >>> DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new >>> CGMKeySelector()).countWindow(2, 1) >>> .apply(new *ArtifactOverlapProvider()* >>> ).setParallelism(Common.FORTY_FIVE).rebalance(); >>> >>> DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new >>> CGMKeySelector()).countWindow(7, 1) >>> .apply(new *SgRocProvider()* >>> ).setParallelism(Common.FORTY_FIVE).rebalance(); >>> >>> DataStream<CGMDataCollector> cgmExcursionStream = >>> streamWithSgRoc.keyBy(new CGMKeySelector()) >>> .countWindow(Common.THREE, Common.ONE).apply(new >>> *CGMExcursionProviderStream()* >>> ).setParallelism(Common.FORTY_FIVE).rebalance(); >>> >>> //Add Sink >>> cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>( >>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new >>> CGMDataCollectorSchema(), >>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); >>> >>> *Implementation classes:-* >>> >>> //deserialize the json message received >>> *ObjectNodeJsonDeSerializerSchema* implements >>> KeyedDeserializationSchema<ObjectNode>{ >>> public ObjectNode deserialize(byte[] messageKey, byte[] message, String >>> topic, int partition, long offset); >>> } >>> >>> //Flapmap to check each message and apply validation >>> public class *SgStreamingTask* extends RichFlatMapFunction<ObjectNode, >>> CGM> { >>> void flatMap(ObjectNode streamData, Collector<CGM> out); >>> } >>> >>> //persist three state variables and apply business logic >>> public class *ArtifactOverlapProvider* extends RichFlatMapFunction<CGM, >>> Tuple2<Long, Long>> >>> implements WindowFunction<CGM, CGM, String, GlobalWindow> { >>> public void apply(String key, GlobalWindow window, Iterable<CGM> values, >>> Collector<CGM> out); >>> } >>> >>> //Apply business logic >>> public class *SgRocProvider* implements WindowFunction<CGM, CGM, >>> String, GlobalWindow>{ >>> public void apply(String key, GlobalWindow window, Iterable<CGM> values, >>> Collector<CGM> out); >>> } >>> >>> //persist 3 state variables and apply business logic >>> public class *CGMExcursionProviderStream* extends >>> RichFlatMapFunction<CGM, Tuple2<Long, Long>> >>> implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{ >>> public void apply(String key, GlobalWindow window, Iterable<CGM> values, >>> Collector<CGMDataCollector> out); >>> >>> } >>> >>> Thanks >>> Kamaal >>> >>> >>> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi Mohammed, >>>> >>>> something is definitely wrong in your setup. You can safely say that >>>> you can process 1k records per second and core with Kafka and light >>>> processing, so you shouldn't even need to go distributed in your case. >>>> >>>> Do you perform any heavy computation? What is your flatMap doing? Are >>>> you emitting lots of small records from one big record? >>>> >>>> Can you please remove all rebalance and report back? Rebalance is >>>> counter-productive if you don't exactly know that you need it. >>>> >>>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal < >>>> mohammed.kamaa...@gmail.com> wrote: >>>> >>>>> Hi Fabian, >>>>> >>>>> Just an update, >>>>> >>>>> Problem 2:- >>>>> ---------------- >>>>> Caused by: org.apache.kafka.common.errors.NetworkException >>>>> It is resolved. It was because we exceeded the number of allowed >>>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted >>>>> unused topics and partitions to resolve the issue. >>>>> >>>>> Problem 1:- >>>>> ---------------- >>>>> I increased the kafka partition and flink parallelism to 45 and the >>>>> throughput has improved from 20 minutes to 14 minutes (20K records). >>>>> Can you check the flink graph and let me know if there is anything >>>>> else that can be done here to improve the throughput further. >>>>> >>>>> Thanks >>>>> >>>>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal >>>>> <mohammed.kamaa...@gmail.com> wrote: >>>>> > >>>>> > Hi Fabian, >>>>> > >>>>> > Problem 1:- >>>>> > --------------------- >>>>> > I have removed the print out sink's and ran the test again. This time >>>>> > the throughput is 17 minutes for 20K records (200 records every >>>>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15 >>>>> > and kafka partition of 15) >>>>> > >>>>> > Please find the attached application graph. Can you suggest what else >>>>> > is required further to improve the throughput. >>>>> > >>>>> > Problem 2:- >>>>> > --------------------- >>>>> > Also, I tried to increase the parallelism to 45 from 15 (also >>>>> > increasing the kafka partition to 45 from 15) to see if this helps in >>>>> > getting a better throughput. >>>>> > >>>>> > After increasing the partition, I am facing the Network issue with >>>>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue >>>>> > with 15 partitions for the kafka topic. This could be an issue with >>>>> > the Kafka cluster? >>>>> > >>>>> > Kafka Cluster Configuration:- >>>>> > --------------------------------------- >>>>> > auto.create.topics.enable=true >>>>> > log.retention.hours=24 >>>>> > default.replication.factor=3 >>>>> > min.insync.replicas=2 >>>>> > num.io.threads=45 >>>>> > num.network.threads=60 >>>>> > num.partitions=45 >>>>> > num.replica.fetchers=2 >>>>> > unclean.leader.election.enable=true >>>>> > replica.lag.time.max.ms=30000 >>>>> > zookeeper.session.timeout.ms=18000 >>>>> > log.retention.ms=172800000 >>>>> > log.cleanup.policy=delete >>>>> > group.max.session.timeout.ms=1200000 >>>>> > >>>>> > Exception:- >>>>> > ---------------- >>>>> > "locationInformation": >>>>> > >>>>> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)", >>>>> > "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask", >>>>> > "message": "Error during disposal of stream operator.", >>>>> > "throwableInformation": [ >>>>> > >>>>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: >>>>> > Failed to send data to Kafka: Failed to send data to Kafka: The >>>>> server >>>>> > disconnected >>>>> > >>>>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The >>>>> > server disconnected before a response was received." >>>>> > >>>>> > >>>>> > Thanks >>>>> > >>>>> > >>>>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul < >>>>> fabianp...@ververica.com> wrote: >>>>> > > >>>>> > > Hi Mohammed, >>>>> > > >>>>> > > 200records should definitely be doable. The first you can do is >>>>> remove the print out Sink because they are increasing the load on your >>>>> cluster due to the additional IO >>>>> > > operation and secondly preventing Flink from fusing operators. >>>>> > > I am interested to see the updated job graph after the removal of >>>>> the print sinks. >>>>> > > >>>>> > > Best, >>>>> > > Fabian >>>>> >>>>