Hello Ahmed, Thank you for your insights! In fact, FLIP-284 <https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable>'s proposal aligns so well with this approach, I have added it has the future scope. To answer your questions:
I assume we will not expose batch creators to users but only to sink implementers, am I correct? - *Yes, that is correct. The BatchCreator interface is intended to be used only by sink implementers, not end users. Since the SinkWriter is responsible for defining how records are batched and written, only those developing a custom sink would need access to this functionality.* It is nit: but I would prefer adding the creator to the `AsyncSinkWriterConfiguration` rather than overloading the constructor, there are some already implemented ways for instantiating the configs with defaults so would be simpler. - *Yes, BatchCreator can be included in AsyncSinkWriterConfiguration. However, my initial reasoning for keeping it separate was that BatchCreator is not just a configuration parameter—it's a functional component that directly influences how records are batched. This makes it more of a behavioral dependency rather than a passive setting. That said, if incorporating it into AsyncSinkWriterConfiguration aligns better with the overall design, it is a nit change and I'm open to updating the approach accordingly. Do let me know your thoughts, as RateLimitingStartegy is also a part of it.* I want to leverage some concept from FLIP-284 that we have previously seen in DynamoDbAsyncSink, do you think we can also customize the trigger so we can also add a "readyToFlush()" method or something similar and use it in `nonBlockingFlush` methods instead of forcing the trigger on only total record count or buffer size. I see this useful in your case as well because if we have buffered 100 requests from 100 different partitions then we will keep triggering and emitting batches of 1 for 100 times (unless maxTimeInBufferMS is used ofc) so you can use the custom trigger for minimum buffered records per partition. This might be a bit tricky because in order to not affect the performance we probably will wrap the whole buffer in the batchCreator to maintain calculations like size in bytes or per partition record count in the batch creator making it more "bufferHandler" rather than a "batchCreator", let me know your thoughts about this - *You bring up a valid concern about optimizing flush triggers to prevent inefficient batch emissions when multiple partitions exist, particularly in sinks like DynamoDB and Cassandra. IMO flushing (when to send records) and batching (how to group records) should remain separate concerns, as they serve distinct purposes.* *To structure this properly, I’ll use FLIP-284 as the foundation:* *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush* *The BufferFlushTrigger should be responsible for determining when the buffer is ready to flush, based on configurable conditions such as:* - *Total record count (batch size threshold).* - *Buffer size in bytes.* - *Time-based constraints (maxTimeInBufferMS), or any other custom logic a sink may require.* *🔹 BatchCreator - Decides How to Form a Batch* *Once BufferFlushTrigger determines that a flush should occur, BatchCreator is responsible for reading the buffered requests and forming a batch based on partitioning rules.* - *BatchCreator does not decide when to flush—it only handles how records are grouped when a flush is triggered.* - *This separation allows partition-aware batching, ensuring that records from the same partition are grouped together before submission.* ------------------------------ *🔹 Example: BatchCreator with Two Partitioning Keys**Scenario* - *batchSize = 20* - *The buffer contains records from two partitions.* - *Assume either (a) we flush when at least 10 records exist per partition.* - *We apply a simple flush strategy that triggers a flush when 20 total records are buffered.* *How BatchCreator Forms the Batch* *Even though BufferFlushTrigger initiates the flush when 20 records are reached, BatchCreator must still decide how to structure the batch. It does so by:* 1. *Reading the buffered records.* 2. *Grouping them by partition key. (Find out the 10 batch)* 3. *Creating a valid batch from these groups before submitting them downstream.* *What I mean is that, the buffered Requests should be there for both flushing and batching for optimized writes. * ------------------------------ *The key takeaway is that by keeping these two interfaces separate, Sink Writers gain flexibility—they can mix and match different batching and flushing strategies. Since there is no tight coupling between them, different sinks can:* - *Implement a distinct batching strategy independent of the flushing mechanism.* - *Customize flush triggers without impacting how records are grouped into batches.* I suppose these two serve complementary but different purposes, *hence they can be implemented differently*. What do you think? On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote: > Hi Poorvank, > Thanks for driving this, +1 (non-binding) for the FLIP in general. While I > see some common grounds with FLIP-284 that I wrote, I prefer going with a > new FLIP since it is driven by a current use case and is specific for > tackling it. I have a couple of clarifying questions. > > 1- I assume we will not expose batch creators to users but only to sink > implementers, am I correct? > > 2- It is nit: but I would prefer adding the creator to the > `AsyncSinkWriterConfiguration` rather than overloading the constructor, > there are some already implemented ways for instantiating the configs with > defaults so would be simpler. > > 3- I want to leverage some concept from FLIP-284 that we have previously > seen in DynamoDbAsyncSink, do you think we can also customize the trigger > so we can also add a "readyToFlush()" method or something similar and use > it in `nonBlockingFlush` methods instead of forcing the trigger on only > total record count or buffer size. I see this useful in your case as well > because if we have buffered 100 requests from 100 different partitions then > we will keep triggering and emitting batches of 1 for 100 times (unless > maxTimeInBufferMS is used ofc) so you can use the custom trigger for > minimum buffered records per partition. This might be a bit tricky because > in order to not affect the performance we probably will wrap the whole > buffer in the batchCreator to maintain calculations like size in bytes or > per partition record count in the batch creator making it more > "bufferHandler" rather than a "batchCreator", let me know your thoughts > about this. > > > Best Regards > Ahmed Hamdy > > > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia <puravbhat...@gmail.com> > wrote: > > > Hey everyone, > > > > I’d like to propose adding a pluggable batching mechanism to > > AsyncSinkWriter > > < > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351 > > > > > to enable custom batch formation strategies. > > Currently, batching is based on batch size and record count, but this > > approach is suboptimal for sinks like Cassandra, which require > > partition-aware batching. Specifically, batches should be formed so that > > all requests within a batch belong to the same partition, ensuring more > > efficient writes. > > > > The proposal introduces a minimal `BatchCreator` interface, enabling > users > > to define custom batching strategies while maintaining backward > > compatibility with a default implementation. > > > > For full details, please refer to the proposal document > > < > > > https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f > > > > > . > > Associated Jira <https://issues.apache.org/jira/browse/FLINK-37298> > > > > Thanks, > > Poorvank > > >