Hello Zain, Thanks for providing the additional information. Going back to the original issue: - You are seeing bursty throughput, but the job is keeping up? There is no backpressure? - What is the throughput at the sink? - On the graph screenshot, what is the period and stat (sum/average/etc)?
Let me shed some light on the log messages, let's take this example: LogInputStreamReader ... Stage 1 Triggers ... { stream: 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3, UserRecords: 6, KinesisRecords: 3 } Flush trigger reason: - manual: the flush was manually triggered - count: flush was triggered by the number of records in the container - size: the flush was triggered by the number of bytes in the container - matches: the predicate was matched - timed: the flush is triggered by elapsed timer Input/Output: - UserRecords: Number of input records KPL flushed (this can be higher than KinesisRecords when aggregation is enabled) - KinesisRecords: Number of records shipped to Kinesis Data Streams Stage 2 triggers tells us the number of API invocations via the PutRecords field. I can see from your logs that the majority of flushes are due to the timer, and it does not look overly bursty. Seems to sit at around 3 records per 15 seconds, or 1 record every 5 seconds. This seems very low, is it expected? Thanks, Danny Cranmer On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <zain.hai...@retailo.co> wrote: > Hey Danny, > Thanks for having a look at the issue. > I am using a custom flink operator to segregate the data into a consistent > format of length 100 which is no more than 1 MB. The configurations I > shared were after I was exploring tweaking some of them to see if it > improves the throughput. > > Regarding your queries : > - Which Flink version is this? -- > *Version 1.13* > - Can you see any errors in the Flink logs? -->* No, Im attaching flink > logs after I have set all the configurations to default* > - Do you see any errors/throttles in the Kinesis Data Stream metrics? --> *I > was before segregating into smaller chunks not anymore* > - How many shards does your stream have? --> *It has 4 shards* > - What is your sink operator parallelism? --> *1* > - What is the general health of your job graph? --> *This is the only job > running at the moment, it isn't unhealthy* > - Are the operators upstream of the sink backpressured? --> *No* > - Are you sure the sink is actually the issue here? --> *I have used > the .print() as a sink and Im seeing all the records in real time it chokes > when paired with sink* > - Are there any other potential bottlenecks? --> *So data is coming in > from source correctly, I have a flatmap transformation enabled which reads > and segments it into chunks of <=1MB which is also tested using the > .print() sink* > - When you say you are trying to achieve "1MB chunks", I assume this is > per Kinesis record, not per PutRecords batch? --> *Correct* > > Attaching a small chunk of the log file from when the job is started [It > goes down to 0 records for some periods of time as well, in the log file it > shows mostly between 3-6 records] > > Really appreciate your response on this, since I have not been able to > gather much help from other resources online. Would be great if you can let > me know what the issue here could be, let me know if you need to know > anything else as well ! > > Cheers > > > On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <dannycran...@apache.org> > wrote: > >> Hello Zain, >> >> When you say "converting them to chunks of <= 1MB " does this mean you >> are creating these chunks in a custom Flink operator, or you are relying on >> the connector to do so? If you are generating your own chunks you can >> potentially disable Aggregation at the sink. >> >> Your throughput is incredibly bursty, I have a few questions: >> - Which Flink version is this? >> - Can you see any errors in the Flink logs? >> - Do you see any errors/throttles in the Kinesis Data Stream metrics? >> - How many shards does your stream have? >> - What is your sink operator parallelism? >> - What is the general health of your job graph? >> - Are the operators upstream of the sink backpressured? >> - Are you sure the sink is actually the issue here? >> - Are there any other potential bottlenecks? >> - When you say you are trying to achieve "1MB chunks", I assume this is >> per Kinesis record, not per PutRecords batch? >> >> Some comments on your configuration: >> >> As previously mentioned, if you are generating the chunks you can >> potentially remove the aggregation config and disable it. >> - producerConfig.put(“AggregationMaxCount”, “3”); >> - producerConfig.put(“AggregationMaxSize”, “256”); >> + producerConfig.put("AggregationEnabled”, “false”); >> >> This is very low, and could conflict with your chunk size. These >> configurations are regarding the PutRecords request, which has a quota of >> 500 records and 5MiB. You are setting the max size to 100kB, which is less >> than your largest chunk. I would recommend removing these configurations. >> - producerConfig.put(“CollectionMaxCount”, “3”); >> - producerConfig.put(“CollectionMaxSize”, “100000”); >> >> This is the default threading model, so can be removed. >> - producerConfig.put(“ThreadingModel”, “POOLED”); >> >> This config should not have too much impact. The default is 100ms, you >> are increasing to 1s. This could increase your end-to-end latency under low >> throughput scenarios. >> - producerConfig.put(“RecordMaxBufferedTime”, “1000"); >> >> This config controls the sink backpressure and can also impact >> throughput. Do you see any logs like "Waiting for the queue length to drop >> below the limit takes unusually long, still not done after <x> attempts"? >> kinesis.setQueueLimit(1000); >> >> Thanks, >> Danny >> >> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß < >> alexanderpre...@ververica.com> wrote: >> >>> Hi Zain, >>> >>> I'm looping in Danny here, he is probably the most knowledgeable when it >>> comes to the Kinesis connector. >>> >>> Best, >>> Alexander >>> >>> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati < >>> zain.hai...@retailo.co> wrote: >>> >>>> Hi, >>>> Im fetching data from kafka topics converting them to chunks of <= 1MB >>>> and sinking them to a kinesis data stream. >>>> The streaming job is functional however I see bursts of data in kinesis >>>> stream with intermittent dips where data received is 0. I'm attaching the >>>> configuration parameters for kinesis sink. What could be the cause of this >>>> issue? >>>> The data is being fed into datastream by a kafka topic which is being >>>> fed in by a mongodb and has about 60 million records which are loaded >>>> fully. >>>> I am trying to configure parameters in such a way that the 1MB per data >>>> payload limit of kinesis is not breached. Would appreciate help on this ! >>>> >>>> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”); >>>> >>>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx"); >>>> >>>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”); >>>> producerConfig.put(“AggregationMaxCount”, “3”); >>>> producerConfig.put(“AggregationMaxSize”, “256”); >>>> producerConfig.put(“CollectionMaxCount”, “3”); >>>> producerConfig.put(“CollectionMaxSize”, “100000”); >>>> producerConfig.put(“AggregationEnabled”, true); >>>> producerConfig.put(“RateLimit”, “50"); >>>> producerConfig.put(“RecordMaxBufferedTime”, “1000"); >>>> producerConfig.put(“ThreadingModel”, “POOLED”); >>>> FlinkKinesisProducer<String> kinesis = new >>>> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); >>>> kinesis.setFailOnError(false); >>>> kinesis.setDefaultStream(“xxx”); >>>> kinesis.setDefaultPartition(“0"); >>>> kinesis.setQueueLimit(1000); >>>> >>>> *Data in Kinesis :* >>>> [image: image.png] >>>> >>> >>> >>> -- >>> >>> Alexander Preuß | Engineer - Data Intensive Systems >>> >>> alexanderpre...@ververica.com >>> >>> <https://www.ververica.com/> >>> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> >>> Ververica GmbH >>> >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> >>> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung >>> Jason, Jinwei (Kevin) Zhang >>> >>>