Hi Kamaal,

I did a quick test with a local Kafka in docker. With parallelism 1, I can
process 20k messages of size 4KB in about 1 min. So if you use parallelism
of 15, I'd expect it to take it below 10s even with bigger data skew.

What I recommend you to do is to start from scratch and just work with a
simple source -> sink. That should be much much faster. If so, then you can
add complexity until you find the bottleneck.

If not, I suspect your ObjectNodeJsonDeSerializerSchema to be the issue.
For example, are you creating an ObjectMapper with each invocation? That's
a typical mistake.

Best,

Arvid

On Mon, Sep 27, 2021 at 2:38 PM Mohammed Kamaal <mohammed.kamaa...@gmail.com>
wrote:

> Hi Robert,
>
> I have removed all the business logic (keyBy and window) operator code and
> just had a source and sink to test it.
> The throughput is 20K messages in 2 minutes. It is a simple read from
> source (kafka topic) and write to sink (kafka topic). Don't you think 2
> minutes is also not a better throughput for a simple read/write
> application?. Each message is 4 KB.
>
> As I had mentioned in the previous email(s), I am using keyBy() and
> Window() to handle business logic. Do you think these operators would have
> a huge impact on the performance?. Or is it something to do with my Kafka
> cluster configuration or the older version of flink (1.8) that I am using
> in my application. Not sure if flink version 1.8 has a performance issue.
>
> Please let me know.
> Below is my kafka cluster configuration.
>
> auto.create.topics.enable=true
> log.retention.hours=24
> default.replication.factor=3
> min.insync.replicas=2
> num.io.threads=45
> num.network.threads=60
> num.partitions=45
> num.replica.fetchers=2
> unclean.leader.election.enable=true
> replica.lag.time.max.ms=30000
> zookeeper.session.timeout.ms=18000
> log.retention.ms=172800000
> log.cleanup.policy=delete
> group.max.session.timeout.ms=1200000
>
>
>
> Thanks
>
> On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Kamaal,
>>
>> I would first suggest understanding the performance bottleneck, before
>> applying any optimizations.
>>
>> Idea 1: Are your CPUs fully utilized?
>> if yes, good, then scaling up will probably help
>> If not, then there's another inefficiency
>>
>> Idea 2: How fast can you get the data into your job, without any
>> processing?
>> You can measure this by submitting a simple Flink job that just reads the
>> data and writes it to a discarding sink. Either disable the operator
>> chaining to get metrics for the records per second, or add a custom mapper
>> in between that measures the throughput.
>> Ideally you see here that you can read all your data in a few seconds, if
>> not, then there's a problem getting your data in.
>>
>> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB,
>> the disk can dramatically slow you down)
>> Idea 4: Are you under high memory pressure, and your JVMs are spending
>> most of their cycles garbage collecting?
>>
>> My bet is you are not getting data into your cluster as fast as you think
>> (Idea 2)
>>
>>
>> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <
>> mohammed.kamaa...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> The throughput has decreased further after I removed all the
>>> rebalance(). The performance has decreased from 14 minutes for 20K messages
>>> to 20 minutes for 20K messages.
>>>
>>> Below are the tasks that the flink application is performing. I am using
>>> keyBy and Window operation. Do you think am I making any mistake here or
>>> the way I am performing the keyBy or Window operation needs to be
>>> corrected?.
>>>
>>> //Add Source
>>> StreamExecutionEnvironment streamenv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> initialStreamData = streamenv.addSource(new
>>> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
>>> new *ObjectNodeJsonDeSerializerSchema()*,
>>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>>>
>>> DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value ->
>>> value.findValue("PERSON_ID").asText())
>>> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE);
>>>
>>> DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new
>>> CGMKeySelector()).countWindow(2, 1)
>>> .apply(new *ArtifactOverlapProvider()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new
>>> CGMKeySelector()).countWindow(7, 1)
>>> .apply(new *SgRocProvider()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> DataStream<CGMDataCollector> cgmExcursionStream =
>>> streamWithSgRoc.keyBy(new CGMKeySelector())
>>> .countWindow(Common.THREE, Common.ONE).apply(new
>>> *CGMExcursionProviderStream()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> //Add Sink
>>> cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
>>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new
>>> CGMDataCollectorSchema(),
>>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>>>
>>> *Implementation classes:-*
>>>
>>> //deserialize the json message received
>>> *ObjectNodeJsonDeSerializerSchema* implements
>>> KeyedDeserializationSchema<ObjectNode>{
>>> public ObjectNode deserialize(byte[] messageKey, byte[] message, String
>>> topic, int partition, long offset);
>>> }
>>>
>>> //Flapmap to check each message and apply validation
>>> public class *SgStreamingTask* extends RichFlatMapFunction<ObjectNode,
>>> CGM> {
>>> void flatMap(ObjectNode streamData, Collector<CGM> out);
>>> }
>>>
>>> //persist three state variables and apply business logic
>>> public class *ArtifactOverlapProvider* extends RichFlatMapFunction<CGM,
>>> Tuple2<Long, Long>>
>>> implements WindowFunction<CGM, CGM, String, GlobalWindow> {
>>> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
>>> Collector<CGM> out);
>>> }
>>>
>>> //Apply business logic
>>> public class *SgRocProvider* implements WindowFunction<CGM, CGM,
>>> String, GlobalWindow>{
>>> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
>>> Collector<CGM> out);
>>> }
>>>
>>> //persist 3 state variables and apply business logic
>>> public class *CGMExcursionProviderStream* extends
>>> RichFlatMapFunction<CGM, Tuple2<Long, Long>>
>>> implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{
>>> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
>>> Collector<CGMDataCollector> out);
>>>
>>> }
>>>
>>> Thanks
>>> Kamaal
>>>
>>>
>>> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Mohammed,
>>>>
>>>> something is definitely wrong in your setup. You can safely say that
>>>> you can process 1k records per second and core with Kafka and light
>>>> processing, so you shouldn't even need to go distributed in your case.
>>>>
>>>> Do you perform any heavy computation? What is your flatMap doing? Are
>>>> you emitting lots of small records from one big record?
>>>>
>>>> Can you please remove all rebalance and report back? Rebalance is
>>>> counter-productive if you don't exactly know that you need it.
>>>>
>>>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <
>>>> mohammed.kamaa...@gmail.com> wrote:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Just an update,
>>>>>
>>>>> Problem 2:-
>>>>> ----------------
>>>>> Caused by: org.apache.kafka.common.errors.NetworkException
>>>>> It is resolved. It was because we exceeded the number of allowed
>>>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>>>>> unused topics and partitions to resolve the issue.
>>>>>
>>>>> Problem 1:-
>>>>> ----------------
>>>>> I increased the kafka partition and flink parallelism to 45 and the
>>>>> throughput has improved from 20 minutes to 14 minutes (20K records).
>>>>> Can you check the flink graph and let me know if there is anything
>>>>> else that can be done here to improve the throughput further.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>>>>> <mohammed.kamaa...@gmail.com> wrote:
>>>>> >
>>>>> > Hi Fabian,
>>>>> >
>>>>> > Problem 1:-
>>>>> > ---------------------
>>>>> > I have removed the print out sink's and ran the test again. This time
>>>>> > the throughput is 17 minutes for 20K records (200 records every
>>>>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>>>>> > and kafka partition of 15)
>>>>> >
>>>>> > Please find the attached application graph. Can you suggest what else
>>>>> > is required further to improve the throughput.
>>>>> >
>>>>> > Problem 2:-
>>>>> > ---------------------
>>>>> > Also, I tried to increase the parallelism to 45 from 15 (also
>>>>> > increasing the kafka partition to 45 from 15) to see if this helps in
>>>>> > getting a better throughput.
>>>>> >
>>>>> > After increasing the partition, I am facing the Network issue with
>>>>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>>>>> > with 15 partitions for the kafka topic. This could be an issue with
>>>>> > the Kafka cluster?
>>>>> >
>>>>> > Kafka Cluster Configuration:-
>>>>> > ---------------------------------------
>>>>> > auto.create.topics.enable=true
>>>>> > log.retention.hours=24
>>>>> > default.replication.factor=3
>>>>> > min.insync.replicas=2
>>>>> > num.io.threads=45
>>>>> > num.network.threads=60
>>>>> > num.partitions=45
>>>>> > num.replica.fetchers=2
>>>>> > unclean.leader.election.enable=true
>>>>> > replica.lag.time.max.ms=30000
>>>>> > zookeeper.session.timeout.ms=18000
>>>>> > log.retention.ms=172800000
>>>>> > log.cleanup.policy=delete
>>>>> > group.max.session.timeout.ms=1200000
>>>>> >
>>>>> > Exception:-
>>>>> > ----------------
>>>>> >  "locationInformation":
>>>>> >
>>>>> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>>>>> >     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>>>>> >     "message": "Error during disposal of stream operator.",
>>>>> >     "throwableInformation": [
>>>>> >
>>>>>  "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>>>>> > Failed to send data to Kafka: Failed to send data to Kafka: The
>>>>> server
>>>>> > disconnected
>>>>> >
>>>>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
>>>>> > server disconnected before a response was received."
>>>>> >
>>>>> >
>>>>> > Thanks
>>>>> >
>>>>> >
>>>>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <
>>>>> fabianp...@ververica.com> wrote:
>>>>> > >
>>>>> > > Hi Mohammed,
>>>>> > >
>>>>> > > 200records should definitely be doable. The first you can do is
>>>>> remove the print out Sink because they are increasing the load on your
>>>>> cluster due to the additional IO
>>>>> > > operation and secondly preventing Flink from fusing operators.
>>>>> > > I am interested to see the updated job graph after the removal of
>>>>> the print sinks.
>>>>> > >
>>>>> > > Best,
>>>>> > > Fabian
>>>>>
>>>>

Reply via email to