Hey Danny, Thanks for getting back to me. - You are seeing bursty throughput, but the job is keeping up? There is no backpressure? --> Correct I'm not seeing any backpressure in any of the metrics - What is the throughput at the sink? --> num of records out -- 1100 per 10 seconds - On the graph screenshot, what is the period and stat (sum/average/etc)? -->It is incoming data (MB/s) each second
So let me explain this in totality, the number of records residing in the source are about 80 million and the number of records i see in the kinesis data stream after it has consumed the data from source is about 20 million so im seeing alot of data loss and I think this potentially has to do with the intermediate dips im seeing in the records coming in the data stream. What are the configurations you guys generally suggest for data of this range and sinking to a kinesis data stream? On Wed, May 18, 2022 at 2:00 AM Danny Cranmer <dannycran...@apache.org> wrote: > Hello Zain, > > Thanks for providing the additional information. Going back to the > original issue: > - You are seeing bursty throughput, but the job is keeping up? There is no > backpressure? > - What is the throughput at the sink? > - On the graph screenshot, what is the period and stat (sum/average/etc)? > > Let me shed some light on the log messages, let's take this example: > > LogInputStreamReader ... Stage 1 Triggers ... { stream: > 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3, > UserRecords: 6, KinesisRecords: 3 } > > Flush trigger reason: > - manual: the flush was manually triggered > - count: flush was triggered by the number of records in the container > - size: the flush was triggered by the number of bytes in the container > - matches: the predicate was matched > - timed: the flush is triggered by elapsed timer > > Input/Output: > - UserRecords: Number of input records KPL flushed (this can be higher > than KinesisRecords when aggregation is enabled) > - KinesisRecords: Number of records shipped to Kinesis Data Streams > > Stage 2 triggers tells us the number of API invocations via the PutRecords > field. > > I can see from your logs that the majority of flushes are due to the > timer, and it does not look overly bursty. Seems to sit at around 3 records > per 15 seconds, or 1 record every 5 seconds. This seems very low, is it > expected? > > Thanks, > Danny Cranmer > > On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati < > zain.hai...@retailo.co> wrote: > >> Hey Danny, >> Thanks for having a look at the issue. >> I am using a custom flink operator to segregate the data into a >> consistent format of length 100 which is no more than 1 MB. The >> configurations I shared were after I was exploring tweaking some of them to >> see if it improves the throughput. >> >> Regarding your queries : >> - Which Flink version is this? -- > *Version 1.13* >> - Can you see any errors in the Flink logs? -->* No, Im attaching flink >> logs after I have set all the configurations to default* >> - Do you see any errors/throttles in the Kinesis Data Stream metrics? >> --> *I was before segregating into smaller chunks not anymore* >> - How many shards does your stream have? --> *It has 4 shards* >> - What is your sink operator parallelism? --> *1* >> - What is the general health of your job graph? --> *This is the only >> job running at the moment, it isn't unhealthy* >> - Are the operators upstream of the sink backpressured? --> *No* >> - Are you sure the sink is actually the issue here? --> *I have used >> the .print() as a sink and Im seeing all the records in real time it chokes >> when paired with sink* >> - Are there any other potential bottlenecks? --> *So data is coming in >> from source correctly, I have a flatmap transformation enabled which reads >> and segments it into chunks of <=1MB which is also tested using the >> .print() sink* >> - When you say you are trying to achieve "1MB chunks", I assume this is >> per Kinesis record, not per PutRecords batch? --> *Correct* >> >> Attaching a small chunk of the log file from when the job is started [It >> goes down to 0 records for some periods of time as well, in the log file it >> shows mostly between 3-6 records] >> >> Really appreciate your response on this, since I have not been able to >> gather much help from other resources online. Would be great if you can let >> me know what the issue here could be, let me know if you need to know >> anything else as well ! >> >> Cheers >> >> >> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <dannycran...@apache.org> >> wrote: >> >>> Hello Zain, >>> >>> When you say "converting them to chunks of <= 1MB " does this mean you >>> are creating these chunks in a custom Flink operator, or you are relying on >>> the connector to do so? If you are generating your own chunks you can >>> potentially disable Aggregation at the sink. >>> >>> Your throughput is incredibly bursty, I have a few questions: >>> - Which Flink version is this? >>> - Can you see any errors in the Flink logs? >>> - Do you see any errors/throttles in the Kinesis Data Stream metrics? >>> - How many shards does your stream have? >>> - What is your sink operator parallelism? >>> - What is the general health of your job graph? >>> - Are the operators upstream of the sink backpressured? >>> - Are you sure the sink is actually the issue here? >>> - Are there any other potential bottlenecks? >>> - When you say you are trying to achieve "1MB chunks", I assume this is >>> per Kinesis record, not per PutRecords batch? >>> >>> Some comments on your configuration: >>> >>> As previously mentioned, if you are generating the chunks you can >>> potentially remove the aggregation config and disable it. >>> - producerConfig.put(“AggregationMaxCount”, “3”); >>> - producerConfig.put(“AggregationMaxSize”, “256”); >>> + producerConfig.put("AggregationEnabled”, “false”); >>> >>> This is very low, and could conflict with your chunk size. These >>> configurations are regarding the PutRecords request, which has a quota of >>> 500 records and 5MiB. You are setting the max size to 100kB, which is less >>> than your largest chunk. I would recommend removing these configurations. >>> - producerConfig.put(“CollectionMaxCount”, “3”); >>> - producerConfig.put(“CollectionMaxSize”, “100000”); >>> >>> This is the default threading model, so can be removed. >>> - producerConfig.put(“ThreadingModel”, “POOLED”); >>> >>> This config should not have too much impact. The default is 100ms, you >>> are increasing to 1s. This could increase your end-to-end latency under low >>> throughput scenarios. >>> - producerConfig.put(“RecordMaxBufferedTime”, “1000"); >>> >>> This config controls the sink backpressure and can also impact >>> throughput. Do you see any logs like "Waiting for the queue length to drop >>> below the limit takes unusually long, still not done after <x> attempts"? >>> kinesis.setQueueLimit(1000); >>> >>> Thanks, >>> Danny >>> >>> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß < >>> alexanderpre...@ververica.com> wrote: >>> >>>> Hi Zain, >>>> >>>> I'm looping in Danny here, he is probably the most knowledgeable when >>>> it comes to the Kinesis connector. >>>> >>>> Best, >>>> Alexander >>>> >>>> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati < >>>> zain.hai...@retailo.co> wrote: >>>> >>>>> Hi, >>>>> Im fetching data from kafka topics converting them to chunks of <= 1MB >>>>> and sinking them to a kinesis data stream. >>>>> The streaming job is functional however I see bursts of data in >>>>> kinesis stream with intermittent dips where data received is 0. I'm >>>>> attaching the configuration parameters for kinesis sink. What could be the >>>>> cause of this issue? >>>>> The data is being fed into datastream by a kafka topic which is being >>>>> fed in by a mongodb and has about 60 million records which are loaded >>>>> fully. >>>>> I am trying to configure parameters in such a way that the 1MB per >>>>> data payload limit of kinesis is not breached. Would appreciate help on >>>>> this ! >>>>> >>>>> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”); >>>>> >>>>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx"); >>>>> >>>>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”); >>>>> producerConfig.put(“AggregationMaxCount”, “3”); >>>>> producerConfig.put(“AggregationMaxSize”, “256”); >>>>> producerConfig.put(“CollectionMaxCount”, “3”); >>>>> producerConfig.put(“CollectionMaxSize”, “100000”); >>>>> producerConfig.put(“AggregationEnabled”, true); >>>>> producerConfig.put(“RateLimit”, “50"); >>>>> producerConfig.put(“RecordMaxBufferedTime”, >>>>> “1000"); >>>>> producerConfig.put(“ThreadingModel”, “POOLED”); >>>>> FlinkKinesisProducer<String> kinesis = new >>>>> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); >>>>> kinesis.setFailOnError(false); >>>>> kinesis.setDefaultStream(“xxx”); >>>>> kinesis.setDefaultPartition(“0"); >>>>> kinesis.setQueueLimit(1000); >>>>> >>>>> *Data in Kinesis :* >>>>> [image: image.png] >>>>> >>>> >>>> >>>> -- >>>> >>>> Alexander Preuß | Engineer - Data Intensive Systems >>>> >>>> alexanderpre...@ververica.com >>>> >>>> <https://www.ververica.com/> >>>> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> >>>> Ververica GmbH >>>> >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> >>>> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung >>>> Jason, Jinwei (Kevin) Zhang >>>> >>>>