Hey Danny, Thank you for the review. Your observation is absolutely valid, and moving away from *Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries* in favor of a more flexible abstraction makes sense. *Given the exposure of batch creators, Deque may not be the most semantically appropriate or performant choice. To address this, a more adaptable buffering abstraction that better aligns with different batching strategies can be used.*
*So I Updated the proposal and* *Introduced BufferWrapper Interface:* - *Defines an abstraction for managing buffered request entries. * - *Supports FIFO, priority-based inserts, and custom queuing mechanisms. * - *Provides essential operations like add, poll, peek, and getBufferedState. * *Implemented Default DequeBufferWrapper * - *Uses ArrayDeque for efficient FIFO behavior while allowing prioritized inserts. * - *Maintains backward compatibility with minimal overhead. * *Modified AsyncSinkWriter to Use BufferWrapper Instead of Deque * - *bufferedRequestEntries is now of type BufferWrapper<RequestEntryT>, making the choice of buffer implementation flexible. * - *A createBuffer() method initializes DequeBufferWrapper by default. * *Updated BatchCreator and SimpleBatchCreator * - *Modified method signatures to work with BufferWrapper<RequestEntryT> instead of Deque<RequestEntryWrapper<RequestEntryT>>. * - *Ensures better encapsulation and allows future optimizations in buffering strategies.* I have added this interface and updated the doc. Please have a look and let me know if this makes sense. On Thu, Feb 20, 2025 at 5:34 PM Arvid Heise <ahe...@confluent.io.invalid> wrote: > Hi Poorvank, > > thanks for putting this together. It's obvious to me that this is a > good addition. I wish Ahmed could check if his proposals are > compatible with yours, so we don't end up with two different ways to > express the same thing. Ideally Ahmed's proposal could be retrofitted > to extend on yours. > > I have a small nit and a larger concern. The nit is to rename > BatchCreationResult into Batch because that's what it is. > > My main concern is around compatibility. If the new interfaces end up > in the signature of the AsyncSinkWriter, we will not be able to run a > connector built on top of it with an older Flink version. Either we > explicitly call it out or we find a way to avoid that scenario. The > connector community had some negative experiences around evolving > APIs. > > One way to avoid breaking compatibility is to provide add-ons where > the interfaces are not baked into the AsyncSinkWriter's API. Consider > your `BatchCreator` interface. If it's just used within the > AsyncSinkWriter (private field, private initialization), then we could > run the same connector on an older Flink version. In that case, > batching just wouldn't be enabled. Of course, all of that only matters > if it's even viable to run the connector without batching. If not > then, just make the connector version depend at least on Flink 2.1+ or > whenever this feature lands. > > Now the question arises how to initialize the BatchCreator. Your > proposal injects it but maybe we can use some kind of discovery > instead (e.g. SPI, convention, and/or annotations). Here is a naive > sketch with reflection using a convention (I'd prefer SPI though). > > public AsyncSinkWriter( > ElementConverter<InputT, RequestEntryT> elementConverter, > WriterInitContext context, > AsyncSinkWriterConfiguration configuration, > Collection<BufferedRequestState<RequestEntryT>> states) { > this(elementConverter, new Sink.InitContextWrapper(context), > configuration, states); > > try { > batchCreator = > (BatchCreator) > Class.forName(getClass().getName() + > "BatchCreator").newInstance(); > } catch (ClassNotFoundException e) { > batchCreator = BatchCreator.DEFAULT; > } catch (IllegalAccessException | InstantiationException e) { > throw new RuntimeException(e); > } > } > > class CassandraBatchCreator implements BatchCreator { ... } > > Best, > > Arvid > > > On Wed, Feb 19, 2025 at 11:07 PM Danny Cranmer <dannycran...@apache.org> > wrote: > > > > Hello Poorvank, > > > > Thanks for opening this discussion/FLIP. I am +1 for the idea, it seems > > like a great enhancement to the AsyncSink. > > > > Just one question/observation from me. We currently use a "Deque<..> > > bufferedRequestEntries". Now we are exposing these batch creators; it > might > > not make sense to use a queue anymore. On one hand it does not make as > much > > semantic sense because we could have frequent queue jumpers. On the other > > hand it might not be performant, since Deque lookups provide O(n) > > performance. What do you think? This is an internal data structure so we > > could consider replacing it with something else, but I might just be > > prematurely optimising this. > > > > Thanks, > > Danny > > > > On Thu, Feb 13, 2025 at 5:31 PM Ahmed Hamdy <hamdy10...@gmail.com> > wrote: > > > > > FYI: Created FLIP-509. > > > > > > Best Regards > > > Ahmed Hamdy > > > > > > > > > On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy <hamdy10...@gmail.com> > wrote: > > > > > > > Hi Poorvank, > > > > thanks for the feedback, I can see your point and I kinda agree, I > would > > > > say, if you don't need the flushing trigger interface in your use > case > > > > let's keep it out of the FLIP for now, Also let's wait for the > feedback > > > > about that from other members. > > > > I will create a FLIP and assign it a number. > > > > > > > > Best Regards > > > > Ahmed Hamdy > > > > > > > > > > > > On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia < > puravbhat...@gmail.com> > > > > wrote: > > > > > > > >> Hello Ahmed, > > > >> Thank you for your insights! In fact, FLIP-284 > > > >> < > > > >> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable > > > >> >'s > > > >> proposal aligns so well with this approach, I have added it has the > > > future > > > >> scope. > > > >> To answer your questions: > > > >> > > > >> I assume we will not expose batch creators to users but only to sink > > > >> implementers, am I correct? > > > >> > > > >> - *Yes, that is correct. The BatchCreator interface is intended > to be > > > >> used only by sink implementers, not end users. Since the > SinkWriter > > > is > > > >> responsible for defining how records are batched and written, > only > > > >> those > > > >> developing a custom sink would need access to this > functionality.* > > > >> > > > >> It is nit: but I would prefer adding the creator to the > > > >> `AsyncSinkWriterConfiguration` rather than overloading the > constructor, > > > >> there are some already implemented ways for instantiating the > configs > > > with > > > >> defaults so would be simpler. > > > >> > > > >> > > > >> - *Yes, BatchCreator can be included in > AsyncSinkWriterConfiguration. > > > >> However, my initial reasoning for keeping it separate was that > > > >> BatchCreator > > > >> is not just a configuration parameter—it's a functional component > > > that > > > >> directly influences how records are batched. This makes it more > of a > > > >> behavioral dependency rather than a passive setting. That said, > if > > > >> incorporating it into AsyncSinkWriterConfiguration aligns better > with > > > >> the > > > >> overall design, it is a nit change and I'm open to updating the > > > >> approach > > > >> accordingly. Do let me know your thoughts, as > RateLimitingStartegy is > > > >> also > > > >> a part of it.* > > > >> > > > >> > > > >> I want to leverage some concept from FLIP-284 that we have > previously > > > >> seen in DynamoDbAsyncSink, do you think we can also customize the > > > trigger > > > >> so we can also add a "readyToFlush()" method or something similar > and > > > use > > > >> it in `nonBlockingFlush` methods instead of forcing the trigger on > only > > > >> total record count or buffer size. I see this useful in your case as > > > well > > > >> because if we have buffered 100 requests from 100 different > partitions > > > >> then > > > >> we will keep triggering and emitting batches of 1 for 100 times > (unless > > > >> maxTimeInBufferMS is used ofc) so you can use the custom trigger for > > > >> minimum buffered records per partition. This might be a bit tricky > > > because > > > >> in order to not affect the performance we probably will wrap the > whole > > > >> buffer in the batchCreator to maintain calculations like size in > bytes > > > or > > > >> per partition record count in the batch creator making it more > > > >> "bufferHandler" rather than a "batchCreator", let me know your > thoughts > > > >> about this > > > >> > > > >> > > > >> - > > > >> > > > >> *You bring up a valid concern about optimizing flush triggers to > > > >> prevent > > > >> inefficient batch emissions when multiple partitions exist, > > > >> particularly in > > > >> sinks like DynamoDB and Cassandra. IMO flushing (when to send > > > records) > > > >> and > > > >> batching (how to group records) should remain separate concerns, > as > > > >> they > > > >> serve distinct purposes.* > > > >> > > > >> *To structure this properly, I’ll use FLIP-284 as the > foundation:* > > > >> *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush* > > > >> > > > >> *The BufferFlushTrigger should be responsible for determining > when > > > the > > > >> buffer is ready to flush, based on configurable conditions such > as:* > > > >> - *Total record count (batch size threshold).* > > > >> - *Buffer size in bytes.* > > > >> - *Time-based constraints (maxTimeInBufferMS), or any other > custom > > > >> logic a sink may require.* > > > >> > > > >> *🔹 BatchCreator - Decides How to Form a Batch* > > > >> > > > >> *Once BufferFlushTrigger determines that a flush should occur, > > > >> BatchCreator is responsible for reading the buffered requests and > > > >> forming a > > > >> batch based on partitioning rules.* > > > >> - *BatchCreator does not decide when to flush—it only handles how > > > >> records are grouped when a flush is triggered.* > > > >> - *This separation allows partition-aware batching, ensuring > that > > > >> records from the same partition are grouped together before > > > >> submission.* > > > >> ------------------------------ > > > >> *🔹 Example: BatchCreator with Two Partitioning Keys**Scenario* > > > >> - *batchSize = 20* > > > >> - *The buffer contains records from two partitions.* > > > >> - *Assume either (a) we flush when at least 10 records exist > per > > > >> partition.* > > > >> - *We apply a simple flush strategy that triggers a flush > when 20 > > > >> total records are buffered.* > > > >> *How BatchCreator Forms the Batch* > > > >> > > > >> *Even though BufferFlushTrigger initiates the flush when 20 > records > > > are > > > >> reached, BatchCreator must still decide how to structure the > batch. > > > It > > > >> does > > > >> so by:* > > > >> 1. *Reading the buffered records.* > > > >> 2. *Grouping them by partition key. (Find out the 10 batch)* > > > >> 3. *Creating a valid batch from these groups before submitting > > > them > > > >> downstream.* > > > >> *What I mean is that, the buffered Requests should be there for > both > > > >> flushing and batching for optimized writes. * > > > >> ------------------------------ > > > >> *The key takeaway is that by keeping these two interfaces > separate, > > > >> Sink > > > >> Writers gain flexibility—they can mix and match different > batching > > > and > > > >> flushing strategies. Since there is no tight coupling between > them, > > > >> different sinks can:* > > > >> - *Implement a distinct batching strategy independent of the > > > >> flushing > > > >> mechanism.* > > > >> - *Customize flush triggers without impacting how records are > > > >> grouped > > > >> into batches.* > > > >> > > > >> I suppose these two serve complementary but different purposes, > *hence > > > >> they > > > >> can be implemented differently*. > > > >> What do you think? > > > >> > > > >> > > > >> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <hamdy10...@gmail.com> > > > wrote: > > > >> > > > >> > Hi Poorvank, > > > >> > Thanks for driving this, +1 (non-binding) for the FLIP in general. > > > >> While I > > > >> > see some common grounds with FLIP-284 that I wrote, I prefer going > > > with > > > >> a > > > >> > new FLIP since it is driven by a current use case and is specific > for > > > >> > tackling it. I have a couple of clarifying questions. > > > >> > > > > >> > 1- I assume we will not expose batch creators to users but only to > > > sink > > > >> > implementers, am I correct? > > > >> > > > > >> > 2- It is nit: but I would prefer adding the creator to the > > > >> > `AsyncSinkWriterConfiguration` rather than overloading the > > > constructor, > > > >> > there are some already implemented ways for instantiating the > configs > > > >> with > > > >> > defaults so would be simpler. > > > >> > > > > >> > 3- I want to leverage some concept from FLIP-284 that we have > > > previously > > > >> > seen in DynamoDbAsyncSink, do you think we can also customize the > > > >> trigger > > > >> > so we can also add a "readyToFlush()" method or something similar > and > > > >> use > > > >> > it in `nonBlockingFlush` methods instead of forcing the trigger on > > > only > > > >> > total record count or buffer size. I see this useful in your case > as > > > >> well > > > >> > because if we have buffered 100 requests from 100 different > partitions > > > >> then > > > >> > we will keep triggering and emitting batches of 1 for 100 times > > > (unless > > > >> > maxTimeInBufferMS is used ofc) so you can use the custom trigger > for > > > >> > minimum buffered records per partition. This might be a bit tricky > > > >> because > > > >> > in order to not affect the performance we probably will wrap the > whole > > > >> > buffer in the batchCreator to maintain calculations like size in > bytes > > > >> or > > > >> > per partition record count in the batch creator making it more > > > >> > "bufferHandler" rather than a "batchCreator", let me know your > > > thoughts > > > >> > about this. > > > >> > > > > >> > > > > >> > Best Regards > > > >> > Ahmed Hamdy > > > >> > > > > >> > > > > >> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia < > puravbhat...@gmail.com > > > > > > > >> > wrote: > > > >> > > > > >> > > Hey everyone, > > > >> > > > > > >> > > I’d like to propose adding a pluggable batching mechanism to > > > >> > > AsyncSinkWriter > > > >> > > < > > > >> > > > > > >> > > > > >> > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351 > > > >> > > > > > > >> > > to enable custom batch formation strategies. > > > >> > > Currently, batching is based on batch size and record count, but > > > this > > > >> > > approach is suboptimal for sinks like Cassandra, which require > > > >> > > partition-aware batching. Specifically, batches should be > formed so > > > >> that > > > >> > > all requests within a batch belong to the same partition, > ensuring > > > >> more > > > >> > > efficient writes. > > > >> > > > > > >> > > The proposal introduces a minimal `BatchCreator` interface, > enabling > > > >> > users > > > >> > > to define custom batching strategies while maintaining backward > > > >> > > compatibility with a default implementation. > > > >> > > > > > >> > > For full details, please refer to the proposal document > > > >> > > < > > > >> > > > > > >> > > > > >> > > > > https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f > > > >> > > > > > > >> > > . > > > >> > > Associated Jira < > https://issues.apache.org/jira/browse/FLINK-37298> > > > >> > > > > > >> > > Thanks, > > > >> > > Poorvank > > > >> > > > > > >> > > > > >> > > > > > > > >