Hello Zain,

Thanks for providing the additional information. Going back to the original
issue:
- You are seeing bursty throughput, but the job is keeping up? There is no
backpressure?
- What is the throughput at the sink?
- On the graph screenshot, what is the period and stat (sum/average/etc)?

Let me shed some light on the log messages, let's take this example:

LogInputStreamReader ... Stage 1 Triggers ...  { stream:
'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
UserRecords: 6, KinesisRecords: 3 }

Flush trigger reason:
- manual: the flush was manually triggered
- count: flush was triggered by the number of records in the container
- size: the flush was triggered by the number of bytes in the container
- matches: the predicate was matched
- timed: the flush is triggered by elapsed timer

Input/Output:
- UserRecords: Number of input records KPL flushed (this can be higher than
KinesisRecords when aggregation is enabled)
- KinesisRecords: Number of records shipped to Kinesis Data Streams

Stage 2 triggers tells us the number of API invocations via the PutRecords
field.

I can see from your logs that the majority of flushes are due to the timer,
and it does not look overly bursty. Seems to sit at around 3 records per 15
seconds, or 1 record every 5 seconds. This seems very low, is it expected?

Thanks,
Danny Cranmer

On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <zain.hai...@retailo.co>
wrote:

> Hey Danny,
> Thanks for having a look at the issue.
> I am using a custom flink operator to segregate the data into a consistent
> format of length 100 which is no more than 1 MB. The configurations I
> shared were after I was exploring tweaking some of them to see if it
> improves the throughput.
>
> Regarding your queries :
> - Which Flink version is this? -- > *Version 1.13*
> - Can you see any errors in the Flink logs?  -->* No, Im attaching flink
> logs after I have set all the configurations to default*
> - Do you see any errors/throttles in the Kinesis Data Stream metrics?  --> *I
> was before segregating into smaller chunks not anymore*
> - How many shards does your stream have? --> *It has 4 shards*
> - What is your sink operator parallelism? --> *1*
> - What is the general health of your job graph? --> *This is the only job
> running at the moment, it isn't unhealthy*
>   - Are the operators upstream of the sink backpressured? --> *No*
>   - Are you sure the sink is actually the issue here? --> *I have used
> the .print() as a sink and Im seeing all the records in real time it chokes
> when paired with sink*
>   - Are there any other potential bottlenecks? --> *So data is coming in
> from source correctly, I have a flatmap transformation enabled which reads
> and segments it into chunks of <=1MB which is also tested using the
> .print() sink*
> - When you say you are trying to achieve "1MB chunks", I assume this is
> per Kinesis record, not per PutRecords batch? --> *Correct*
>
> Attaching a small chunk of the log file from when the job is started [It
> goes down to 0 records for some periods of time as well, in the log file it
> shows mostly between 3-6 records]
>
> Really appreciate your response on this, since I have not been able to
> gather much help from other resources online. Would be great if you can let
> me know what the issue here could be, let me know if you need to know
> anything else as well !
>
> Cheers
>
>
> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <dannycran...@apache.org>
> wrote:
>
>> Hello Zain,
>>
>> When you say "converting them to chunks of <= 1MB " does this mean you
>> are creating these chunks in a custom Flink operator, or you are relying on
>> the connector to do so? If you are generating your own chunks you can
>> potentially disable Aggregation at the sink.
>>
>> Your throughput is incredibly bursty, I have a few questions:
>> - Which Flink version is this?
>> - Can you see any errors in the Flink logs?
>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>> - How many shards does your stream have?
>> - What is your sink operator parallelism?
>> - What is the general health of your job graph?
>>   - Are the operators upstream of the sink backpressured?
>>   - Are you sure the sink is actually the issue here?
>>   - Are there any other potential bottlenecks?
>> - When you say you are trying to achieve "1MB chunks", I assume this is
>> per Kinesis record, not per PutRecords batch?
>>
>> Some comments on your configuration:
>>
>> As previously mentioned, if you are generating the chunks you can
>> potentially remove the aggregation config and disable it.
>> - producerConfig.put(“AggregationMaxCount”, “3”);
>> - producerConfig.put(“AggregationMaxSize”, “256”);
>> + producerConfig.put("AggregationEnabled”, “false”);
>>
>> This is very low, and could conflict with your chunk size. These
>> configurations are regarding the PutRecords request, which has a quota of
>> 500 records and 5MiB. You are setting the max size to 100kB, which is less
>> than your largest chunk. I would recommend removing these configurations.
>> - producerConfig.put(“CollectionMaxCount”, “3”);
>> - producerConfig.put(“CollectionMaxSize”, “100000”);
>>
>> This is the default threading model, so can be removed.
>> - producerConfig.put(“ThreadingModel”, “POOLED”);
>>
>> This config should not have too much impact. The default is 100ms, you
>> are increasing to 1s. This could increase your end-to-end latency under low
>> throughput scenarios.
>> - producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>
>> This config controls the sink backpressure and can also impact
>> throughput. Do you see any logs like "Waiting for the queue length to drop
>> below the limit takes unusually long, still not done after <x> attempts"?
>> kinesis.setQueueLimit(1000);
>>
>> Thanks,
>> Danny
>>
>> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <
>> alexanderpre...@ververica.com> wrote:
>>
>>> Hi Zain,
>>>
>>> I'm looping in Danny here, he is probably the most knowledgeable when it
>>> comes to the Kinesis connector.
>>>
>>> Best,
>>> Alexander
>>>
>>> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <
>>> zain.hai...@retailo.co> wrote:
>>>
>>>> Hi,
>>>> Im fetching data from kafka topics converting them to chunks of <= 1MB
>>>> and sinking them to a kinesis data stream.
>>>> The streaming job is functional however I see bursts of data in kinesis
>>>> stream with intermittent dips where data received is 0. I'm attaching the
>>>> configuration parameters for kinesis sink. What could be the cause of this
>>>> issue?
>>>> The data is being fed into datastream by a kafka topic which is being
>>>> fed in by a mongodb and has about 60 million records which are loaded 
>>>> fully.
>>>> I am trying to configure parameters in such a way that the 1MB per data
>>>> payload limit of kinesis is not breached. Would appreciate help on this !
>>>>
>>>> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>>>>
>>>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>>>>
>>>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>>>>                     producerConfig.put(“AggregationMaxCount”, “3”);
>>>>                     producerConfig.put(“AggregationMaxSize”, “256”);
>>>>                     producerConfig.put(“CollectionMaxCount”, “3”);
>>>>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>>>>                     producerConfig.put(“AggregationEnabled”, true);
>>>>                     producerConfig.put(“RateLimit”, “50");
>>>>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>>>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>>>>                     FlinkKinesisProducer<String> kinesis = new
>>>> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>>>>                         kinesis.setFailOnError(false);
>>>>                         kinesis.setDefaultStream(“xxx”);
>>>>                         kinesis.setDefaultPartition(“0");
>>>>                         kinesis.setQueueLimit(1000);
>>>>
>>>> *Data in Kinesis :*
>>>> [image: image.png]
>>>>
>>>
>>>
>>> --
>>>
>>> Alexander Preuß | Engineer - Data Intensive Systems
>>>
>>> alexanderpre...@ververica.com
>>>
>>> <https://www.ververica.com/>
>>>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>>
>>> Ververica GmbH
>>>
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>
>>> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
>>> Jason, Jinwei (Kevin) Zhang
>>>
>>>

Reply via email to