Hey Danny,

Thank you for the review. Your observation is absolutely valid, and moving
away from *Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries*
in favor of a more flexible abstraction makes sense.
*Given the exposure of batch creators, Deque may not be the most
semantically appropriate or performant choice. To address this, a more
adaptable buffering abstraction that better aligns with different batching
strategies can be used.*

*So I Updated the proposal and*
*Introduced BufferWrapper Interface:*

   - *Defines an abstraction for managing buffered request entries. *
   - *Supports FIFO, priority-based inserts, and custom queuing
   mechanisms. *
   - *Provides essential operations like add, poll, peek, and
   getBufferedState. *

*Implemented Default DequeBufferWrapper *

   - *Uses ArrayDeque for efficient FIFO behavior while allowing
   prioritized inserts. *
   - *Maintains backward compatibility with minimal overhead. *

*Modified AsyncSinkWriter to Use BufferWrapper Instead of Deque *

   - *bufferedRequestEntries is now of type BufferWrapper<RequestEntryT>,
   making the choice of buffer implementation flexible. *
   - *A createBuffer() method initializes DequeBufferWrapper by default. *

*Updated BatchCreator and SimpleBatchCreator *

   - *Modified method signatures to work with BufferWrapper<RequestEntryT>
   instead of Deque<RequestEntryWrapper<RequestEntryT>>. *
   - *Ensures better encapsulation and allows future optimizations in
   buffering strategies.*

I have added this interface and updated the doc. Please have a look and let
me know if this makes sense.


On Thu, Feb 20, 2025 at 5:34 PM Arvid Heise <ahe...@confluent.io.invalid>
wrote:

> Hi Poorvank,
>
> thanks for putting this together. It's obvious to me that this is a
> good addition. I wish Ahmed could check if his proposals are
> compatible with yours, so we don't end up with two different ways to
> express the same thing. Ideally Ahmed's proposal could be retrofitted
> to extend on yours.
>
> I have a small nit and a larger concern. The nit is to rename
> BatchCreationResult into Batch because that's what it is.
>
> My main concern is around compatibility. If the new interfaces end up
> in the signature of the AsyncSinkWriter, we will not be able to run a
> connector built on top of it with an older Flink version. Either we
> explicitly call it out or we find a way to avoid that scenario. The
> connector community had some negative experiences around evolving
> APIs.
>
> One way to avoid breaking compatibility is to provide add-ons where
> the interfaces are not baked into the AsyncSinkWriter's API. Consider
> your `BatchCreator` interface. If it's just used within the
> AsyncSinkWriter (private field, private initialization), then we could
> run the same connector on an older Flink version. In that case,
> batching just wouldn't be enabled. Of course, all of that only matters
> if it's even viable to run the connector without batching. If not
> then, just make the connector version depend at least on Flink 2.1+ or
> whenever this feature lands.
>
> Now the question arises how to initialize the BatchCreator. Your
> proposal injects it but maybe we can use some kind of discovery
> instead (e.g. SPI, convention, and/or annotations). Here is a naive
> sketch with reflection using a convention (I'd prefer SPI though).
>
> public AsyncSinkWriter(
>         ElementConverter<InputT, RequestEntryT> elementConverter,
>         WriterInitContext context,
>         AsyncSinkWriterConfiguration configuration,
>         Collection<BufferedRequestState<RequestEntryT>> states) {
>     this(elementConverter, new Sink.InitContextWrapper(context),
> configuration, states);
>
>     try {
>         batchCreator =
>                 (BatchCreator)
>                         Class.forName(getClass().getName() +
> "BatchCreator").newInstance();
>     } catch (ClassNotFoundException e) {
>         batchCreator = BatchCreator.DEFAULT;
>     } catch (IllegalAccessException | InstantiationException e) {
>         throw new RuntimeException(e);
>     }
> }
>
> class CassandraBatchCreator implements BatchCreator { ... }
>
> Best,
>
> Arvid
>
>
> On Wed, Feb 19, 2025 at 11:07 PM Danny Cranmer <dannycran...@apache.org>
> wrote:
> >
> > Hello Poorvank,
> >
> > Thanks for opening this discussion/FLIP. I am +1 for the idea, it seems
> > like a great enhancement to the AsyncSink.
> >
> > Just one question/observation from me. We currently use a "Deque<..>
> > bufferedRequestEntries". Now we are exposing these batch creators; it
> might
> > not make sense to use a queue anymore. On one hand it does not make as
> much
> > semantic sense because we could have frequent queue jumpers. On the other
> > hand it might not be performant, since Deque lookups provide O(n)
> > performance. What do you think? This is an internal data structure so we
> > could consider replacing it with something else, but I might just be
> > prematurely optimising this.
> >
> > Thanks,
> > Danny
> >
> > On Thu, Feb 13, 2025 at 5:31 PM Ahmed Hamdy <hamdy10...@gmail.com>
> wrote:
> >
> > > FYI: Created FLIP-509.
> > >
> > > Best Regards
> > > Ahmed Hamdy
> > >
> > >
> > > On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy <hamdy10...@gmail.com>
> wrote:
> > >
> > > > Hi Poorvank,
> > > > thanks for the feedback, I can see your point and I kinda agree, I
> would
> > > > say, if you don't need the flushing trigger interface in your use
> case
> > > > let's keep it out of the FLIP for now, Also let's wait for the
> feedback
> > > > about that from other members.
> > > > I will create a FLIP and assign it a number.
> > > >
> > > > Best Regards
> > > > Ahmed Hamdy
> > > >
> > > >
> > > > On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia <
> puravbhat...@gmail.com>
> > > > wrote:
> > > >
> > > >> Hello Ahmed,
> > > >> Thank you for your insights! In fact, FLIP-284
> > > >> <
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable
> > > >> >'s
> > > >> proposal aligns so well with this approach, I have added it has the
> > > future
> > > >> scope.
> > > >> To answer your questions:
> > > >>
> > > >> I assume we will not expose batch creators to users but only to sink
> > > >> implementers, am I correct?
> > > >>
> > > >>    - *Yes, that is correct. The BatchCreator interface is intended
> to be
> > > >>    used only by sink implementers, not end users. Since the
> SinkWriter
> > > is
> > > >>    responsible for defining how records are batched and written,
> only
> > > >> those
> > > >>    developing a custom sink would need access to this
> functionality.*
> > > >>
> > > >> It is nit: but I would prefer adding the creator to the
> > > >> `AsyncSinkWriterConfiguration` rather than overloading the
> constructor,
> > > >> there are some already implemented ways for instantiating the
> configs
> > > with
> > > >> defaults so would be simpler.
> > > >>
> > > >>
> > > >>    - *Yes, BatchCreator can be included in
> AsyncSinkWriterConfiguration.
> > > >>    However, my initial reasoning for keeping it separate was that
> > > >> BatchCreator
> > > >>    is not just a configuration parameter—it's a functional component
> > > that
> > > >>    directly influences how records are batched. This makes it more
> of a
> > > >>    behavioral dependency rather than a passive setting. That said,
> if
> > > >>    incorporating it into AsyncSinkWriterConfiguration aligns better
> with
> > > >> the
> > > >>    overall design, it is a nit change and I'm open to updating the
> > > >> approach
> > > >>    accordingly. Do let me know your thoughts, as
> RateLimitingStartegy is
> > > >> also
> > > >>    a part of it.*
> > > >>
> > > >>
> > > >>  I want to leverage some concept from FLIP-284 that we have
> previously
> > > >> seen in DynamoDbAsyncSink, do you think we can also customize the
> > > trigger
> > > >> so we can also add a "readyToFlush()" method or something similar
> and
> > > use
> > > >> it in `nonBlockingFlush` methods instead of forcing the trigger on
> only
> > > >> total record count or buffer size. I see this useful in your case as
> > > well
> > > >> because if we have buffered 100 requests from 100 different
> partitions
> > > >> then
> > > >> we will keep triggering and emitting batches of 1 for 100 times
> (unless
> > > >> maxTimeInBufferMS is used ofc) so you can use the custom trigger for
> > > >> minimum buffered records per partition. This might be a bit tricky
> > > because
> > > >> in order to not affect the performance we probably will wrap the
> whole
> > > >> buffer in the batchCreator to maintain calculations like size in
> bytes
> > > or
> > > >> per partition record count in the batch creator making it more
> > > >> "bufferHandler" rather than a "batchCreator", let me know your
> thoughts
> > > >> about this
> > > >>
> > > >>
> > > >>    -
> > > >>
> > > >>    *You bring up a valid concern about optimizing flush triggers to
> > > >> prevent
> > > >>    inefficient batch emissions when multiple partitions exist,
> > > >> particularly in
> > > >>    sinks like DynamoDB and Cassandra. IMO flushing (when to send
> > > records)
> > > >> and
> > > >>    batching (how to group records) should remain separate concerns,
> as
> > > >> they
> > > >>    serve distinct purposes.*
> > > >>
> > > >>    *To structure this properly, I’ll use FLIP-284 as the
> foundation:*
> > > >>    *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush*
> > > >>
> > > >>    *The BufferFlushTrigger should be responsible for determining
> when
> > > the
> > > >>    buffer is ready to flush, based on configurable conditions such
> as:*
> > > >>    - *Total record count (batch size threshold).*
> > > >>       - *Buffer size in bytes.*
> > > >>       - *Time-based constraints (maxTimeInBufferMS), or any other
> custom
> > > >>       logic a sink may require.*
> > > >>
> > > >>    *🔹 BatchCreator - Decides How to Form a Batch*
> > > >>
> > > >>    *Once BufferFlushTrigger determines that a flush should occur,
> > > >>    BatchCreator is responsible for reading the buffered requests and
> > > >> forming a
> > > >>    batch based on partitioning rules.*
> > > >>    - *BatchCreator does not decide when to flush—it only handles how
> > > >>       records are grouped when a flush is triggered.*
> > > >>       - *This separation allows partition-aware batching, ensuring
> that
> > > >>       records from the same partition are grouped together before
> > > >> submission.*
> > > >>    ------------------------------
> > > >>    *🔹 Example: BatchCreator with Two Partitioning Keys**Scenario*
> > > >>       - *batchSize = 20*
> > > >>       - *The buffer contains records from two partitions.*
> > > >>       - *Assume either (a) we flush when at least 10 records exist
> per
> > > >>       partition.*
> > > >>       - *We apply a simple flush strategy that triggers a flush
> when 20
> > > >>       total records are buffered.*
> > > >>    *How BatchCreator Forms the Batch*
> > > >>
> > > >>    *Even though BufferFlushTrigger initiates the flush when 20
> records
> > > are
> > > >>    reached, BatchCreator must still decide how to structure the
> batch.
> > > It
> > > >> does
> > > >>    so by:*
> > > >>    1. *Reading the buffered records.*
> > > >>       2. *Grouping them by partition key. (Find out the 10 batch)*
> > > >>       3. *Creating a valid batch from these groups before submitting
> > > them
> > > >>       downstream.*
> > > >>    *What I mean is that, the buffered Requests should be there for
> both
> > > >>    flushing and batching for optimized writes. *
> > > >>    ------------------------------
> > > >>    *The key takeaway is that by keeping these two interfaces
> separate,
> > > >> Sink
> > > >>    Writers gain flexibility—they can mix and match different
> batching
> > > and
> > > >>    flushing strategies. Since there is no tight coupling between
> them,
> > > >>    different sinks can:*
> > > >>       - *Implement a distinct batching strategy independent of the
> > > >> flushing
> > > >>       mechanism.*
> > > >>       - *Customize flush triggers without impacting how records are
> > > >> grouped
> > > >>       into batches.*
> > > >>
> > > >> I suppose these two serve complementary but different purposes,
> *hence
> > > >> they
> > > >> can be implemented differently*.
> > > >> What do you think?
> > > >>
> > > >>
> > > >> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <hamdy10...@gmail.com>
> > > wrote:
> > > >>
> > > >> > Hi Poorvank,
> > > >> > Thanks for driving this, +1 (non-binding) for the FLIP in general.
> > > >> While I
> > > >> > see some common grounds with FLIP-284 that I wrote, I prefer going
> > > with
> > > >> a
> > > >> > new FLIP since it is driven by a current use case and is specific
> for
> > > >> > tackling it. I have a couple of clarifying questions.
> > > >> >
> > > >> > 1- I assume we will not expose batch creators to users but only to
> > > sink
> > > >> > implementers, am I correct?
> > > >> >
> > > >> > 2- It is nit: but I would prefer adding the creator to the
> > > >> > `AsyncSinkWriterConfiguration` rather than overloading the
> > > constructor,
> > > >> > there are some already implemented ways for instantiating the
> configs
> > > >> with
> > > >> > defaults so would be simpler.
> > > >> >
> > > >> > 3- I want to leverage some concept from FLIP-284 that we have
> > > previously
> > > >> > seen in DynamoDbAsyncSink, do you think we can also customize the
> > > >> trigger
> > > >> > so we can also add a "readyToFlush()" method or something similar
> and
> > > >> use
> > > >> > it in `nonBlockingFlush` methods instead of forcing the trigger on
> > > only
> > > >> > total record count or buffer size. I see this useful in your case
> as
> > > >> well
> > > >> > because if we have buffered 100 requests from 100 different
> partitions
> > > >> then
> > > >> > we will keep triggering and emitting batches of 1 for 100 times
> > > (unless
> > > >> > maxTimeInBufferMS is used ofc) so you can use the custom trigger
> for
> > > >> > minimum buffered records per partition. This might be a bit tricky
> > > >> because
> > > >> > in order to not affect the performance we probably will wrap the
> whole
> > > >> > buffer in the batchCreator to maintain calculations like size in
> bytes
> > > >> or
> > > >> > per partition record count in the batch creator making it more
> > > >> > "bufferHandler" rather than a "batchCreator", let me know your
> > > thoughts
> > > >> > about this.
> > > >> >
> > > >> >
> > > >> > Best Regards
> > > >> > Ahmed Hamdy
> > > >> >
> > > >> >
> > > >> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia <
> puravbhat...@gmail.com
> > > >
> > > >> > wrote:
> > > >> >
> > > >> > > Hey everyone,
> > > >> > >
> > > >> > > I’d like to propose adding a pluggable batching mechanism to
> > > >> > > AsyncSinkWriter
> > > >> > > <
> > > >> > >
> > > >> >
> > > >>
> > >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351
> > > >> > > >
> > > >> > >  to enable custom batch formation strategies.
> > > >> > > Currently, batching is based on batch size and record count, but
> > > this
> > > >> > > approach is suboptimal for sinks like Cassandra, which require
> > > >> > > partition-aware batching. Specifically, batches should be
> formed so
> > > >> that
> > > >> > > all requests within a batch belong to the same partition,
> ensuring
> > > >> more
> > > >> > > efficient writes.
> > > >> > >
> > > >> > > The proposal introduces a minimal `BatchCreator` interface,
> enabling
> > > >> > users
> > > >> > > to define custom batching strategies while maintaining backward
> > > >> > > compatibility with a default implementation.
> > > >> > >
> > > >> > > For full details, please refer to the proposal document
> > > >> > > <
> > > >> > >
> > > >> >
> > > >>
> > >
> https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f
> > > >> > > >
> > > >> > > .
> > > >> > > Associated Jira <
> https://issues.apache.org/jira/browse/FLINK-37298>
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Poorvank
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
>

Reply via email to