Hi Robert,

I have removed all the business logic (keyBy and window) operator code and just 
had a source and sink to test it.
The throughput is 20K messages in 2 minutes. It is a simple read from source 
(kafka topic) and write to sink (kafka topic). Don't you think 2 minutes is 
also not a better throughput for a simple read/write application?. Each message 
is 4 KB.

As I had mentioned in the previous email(s), I am using keyBy() and Window() to 
handle business logic. Do you think these operators would have a huge impact on 
the performance?. Or is it something to do with my Kafka cluster configuration 
or the older version of flink (1.8) that I am using in my application. Not sure 
if flink version 1.8 has a performance issue.

Please let me know.
Below is my kafka cluster configuration.

auto.create.topics.enable=true
log.retention.hours=24
default.replication.factor=3
min.insync.replicas=2
num.io.threads=45
num.network.threads=60
num.partitions=45
num.replica.fetchers=2
unclean.leader.election.enable=true
replica.lag.time.max.ms=30000
zookeeper.session.timeout.ms=18000
log.retention.ms=172800000
log.cleanup.policy=delete
group.max.session.timeout.ms=1200000



Thanks

> On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger <rmetz...@apache.org> wrote:
> Hi Kamaal,
> 
> I would first suggest understanding the performance bottleneck, before 
> applying any optimizations.
> 
> Idea 1: Are your CPUs fully utilized?
> if yes, good, then scaling up will probably help
> If not, then there's another inefficiency
> 
> Idea 2: How fast can you get the data into your job, without any processing?
> You can measure this by submitting a simple Flink job that just reads the 
> data and writes it to a discarding sink. Either disable the operator chaining 
> to get metrics for the records per second, or add a custom mapper in between 
> that measures the throughput.
> Ideally you see here that you can read all your data in a few seconds, if 
> not, then there's a problem getting your data in.
> 
> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB, the 
> disk can dramatically slow you down)
> Idea 4: Are you under high memory pressure, and your JVMs are spending most 
> of their cycles garbage collecting?
> 
> My bet is you are not getting data into your cluster as fast as you think 
> (Idea 2)
> 
> 
>> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal 
>> <mohammed.kamaa...@gmail.com> wrote:
>> Hi Arvid,
>> 
>> The throughput has decreased further after I removed all the rebalance(). 
>> The performance has decreased from 14 minutes for 20K messages to 20 minutes 
>> for 20K messages.
>> 
>> Below are the tasks that the flink application is performing. I am using 
>> keyBy and Window operation. Do you think am I making any mistake here or the 
>> way I am performing the keyBy or Window operation needs to be corrected?.
>> 
>> //Add Source
>> StreamExecutionEnvironment streamenv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> initialStreamData = streamenv.addSource(new 
>> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
>> new ObjectNodeJsonDeSerializerSchema(), 
>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>> 
>> DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value -> 
>> value.findValue("PERSON_ID").asText())
>> .flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE);
>> 
>> DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new 
>> CGMKeySelector()).countWindow(2, 1)
>> .apply(new 
>> ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new 
>> CGMKeySelector()).countWindow(7, 1)
>> .apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> DataStream<CGMDataCollector> cgmExcursionStream = streamWithSgRoc.keyBy(new 
>> CGMKeySelector())
>> .countWindow(Common.THREE, Common.ONE).apply(new 
>> CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> //Add Sink
>> cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new 
>> CGMDataCollectorSchema(),
>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>> 
>> Implementation classes:-
>> 
>> //deserialize the json message received
>> ObjectNodeJsonDeSerializerSchema implements 
>> KeyedDeserializationSchema<ObjectNode>{
>> public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
>> topic, int partition, long offset);
>> }
>> 
>> //Flapmap to check each message and apply validation
>> public class SgStreamingTask extends RichFlatMapFunction<ObjectNode, CGM> {
>> void flatMap(ObjectNode streamData, Collector<CGM> out);
>> }
>> 
>> //persist three state variables and apply business logic
>> public class ArtifactOverlapProvider extends RichFlatMapFunction<CGM, 
>> Tuple2<Long, Long>>
>> implements WindowFunction<CGM, CGM, String, GlobalWindow> {
>> public void apply(String key, GlobalWindow window, Iterable<CGM> values, 
>> Collector<CGM> out);
>> }
>> 
>> //Apply business logic
>> public class SgRocProvider implements WindowFunction<CGM, CGM, String, 
>> GlobalWindow>{
>> public void apply(String key, GlobalWindow window, Iterable<CGM> values, 
>> Collector<CGM> out);
>> }
>> 
>> //persist 3 state variables and apply business logic
>> public class CGMExcursionProviderStream extends RichFlatMapFunction<CGM, 
>> Tuple2<Long, Long>>
>> implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{
>> public void apply(String key, GlobalWindow window, Iterable<CGM> values, 
>> Collector<CGMDataCollector> out);
>> 
>> }
>> 
>> Thanks
>> Kamaal
>> 
>> 
>>> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote:
>>> Hi Mohammed,
>>> 
>>> something is definitely wrong in your setup. You can safely say that you 
>>> can process 1k records per second and core with Kafka and light processing, 
>>> so you shouldn't even need to go distributed in your case.
>>> 
>>> Do you perform any heavy computation? What is your flatMap doing? Are you 
>>> emitting lots of small records from one big record?
>>> 
>>> Can you please remove all rebalance and report back? Rebalance is 
>>> counter-productive if you don't exactly know that you need it.
>>> 
>>>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal 
>>>> <mohammed.kamaa...@gmail.com> wrote:
>>>> Hi Fabian,
>>>> 
>>>> Just an update,
>>>> 
>>>> Problem 2:-
>>>> ----------------
>>>> Caused by: org.apache.kafka.common.errors.NetworkException
>>>> It is resolved. It was because we exceeded the number of allowed
>>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>>>> unused topics and partitions to resolve the issue.
>>>> 
>>>> Problem 1:-
>>>> ----------------
>>>> I increased the kafka partition and flink parallelism to 45 and the
>>>> throughput has improved from 20 minutes to 14 minutes (20K records).
>>>> Can you check the flink graph and let me know if there is anything
>>>> else that can be done here to improve the throughput further.
>>>> 
>>>> Thanks
>>>> 
>>>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>>>> <mohammed.kamaa...@gmail.com> wrote:
>>>> >
>>>> > Hi Fabian,
>>>> >
>>>> > Problem 1:-
>>>> > ---------------------
>>>> > I have removed the print out sink's and ran the test again. This time
>>>> > the throughput is 17 minutes for 20K records (200 records every
>>>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>>>> > and kafka partition of 15)
>>>> >
>>>> > Please find the attached application graph. Can you suggest what else
>>>> > is required further to improve the throughput.
>>>> >
>>>> > Problem 2:-
>>>> > ---------------------
>>>> > Also, I tried to increase the parallelism to 45 from 15 (also
>>>> > increasing the kafka partition to 45 from 15) to see if this helps in
>>>> > getting a better throughput.
>>>> >
>>>> > After increasing the partition, I am facing the Network issue with
>>>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>>>> > with 15 partitions for the kafka topic. This could be an issue with
>>>> > the Kafka cluster?
>>>> >
>>>> > Kafka Cluster Configuration:-
>>>> > ---------------------------------------
>>>> > auto.create.topics.enable=true
>>>> > log.retention.hours=24
>>>> > default.replication.factor=3
>>>> > min.insync.replicas=2
>>>> > num.io.threads=45
>>>> > num.network.threads=60
>>>> > num.partitions=45
>>>> > num.replica.fetchers=2
>>>> > unclean.leader.election.enable=true
>>>> > replica.lag.time.max.ms=30000
>>>> > zookeeper.session.timeout.ms=18000
>>>> > log.retention.ms=172800000
>>>> > log.cleanup.policy=delete
>>>> > group.max.session.timeout.ms=1200000
>>>> >
>>>> > Exception:-
>>>> > ----------------
>>>> >  "locationInformation":
>>>> > "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>>>> >     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>>>> >     "message": "Error during disposal of stream operator.",
>>>> >     "throwableInformation": [
>>>> >         "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>>>> > Failed to send data to Kafka: Failed to send data to Kafka: The server
>>>> > disconnected
>>>> >
>>>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
>>>> > server disconnected before a response was received."
>>>> >
>>>> >
>>>> > Thanks
>>>> >
>>>> >
>>>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fabianp...@ververica.com> 
>>>> > wrote:
>>>> > >
>>>> > > Hi Mohammed,
>>>> > >
>>>> > > 200records should definitely be doable. The first you can do is remove 
>>>> > > the print out Sink because they are increasing the load on your 
>>>> > > cluster due to the additional IO
>>>> > > operation and secondly preventing Flink from fusing operators.
>>>> > > I am interested to see the updated job graph after the removal of the 
>>>> > > print sinks.
>>>> > >
>>>> > > Best,
>>>> > > Fabian

Reply via email to