Hi Zain,

For Flink 1.13, we use the KinesisProducerLibrary. If you are using 
aggregation, you can control the maximum size of aggregated records by 
configuring the AggregationMaxSize in the producer config when constructing the 
FlinkKinesisProducer. (See [1] for more docs)


producerConfig.put("AggregationMaxSize", "1048576”);

However, since the default value is actually <1MB here, I doubt this is the 
issue. A possibility I can think of is that a single record is larger than 1MB, 
so the aggregation limit doesn’t apply. If this is the case, the way forward 
would be to change the record size to be lower than 1MB.

In general, I would recommend upgrading to Flink 1.15 and using the newer 
KinesisStreamsSink. That sink is more configurable (see below and see [2] for 
more docs), and will surface the problem explicitly if the issue is really that 
a single record is larger than 1MB.

(Note that we use the PutRecords API, so individual records still need to be 
smaller than 1MB, but batches can be up to 5MB) See [3] for more info.


        .setMaxBatchSizeInBytes(5 * 1024 * 1024)
        .setMaxRecordSizeInBytes(1 * 1024 * 1024)



Thanks,
Hong


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/#kinesis-producer
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/#kinesis-streams-sink
[3] https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html




On 2022/05/12 10:30:47 Zain Haider Nemati wrote:
> Hi,
> I am using a kinesis sink with flink 1.13.
> The amount of data is in millions and it choke the 1MB cap for kinesis data
> streams.
> Is there any way to send data to kinesis sink in batches of less than 1MB?
> or any other workaround
>

Reply via email to