Hello Ahmed,
Thank you for your insights! In fact, FLIP-284
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable>'s
proposal aligns so well with this approach, I have added it has the future
scope.
To answer your questions:

I assume we will not expose batch creators to users but only to sink
implementers, am I correct?

   - *Yes, that is correct. The BatchCreator interface is intended to be
   used only by sink implementers, not end users. Since the SinkWriter is
   responsible for defining how records are batched and written, only those
   developing a custom sink would need access to this functionality.*

It is nit: but I would prefer adding the creator to the
`AsyncSinkWriterConfiguration` rather than overloading the constructor,
there are some already implemented ways for instantiating the configs with
defaults so would be simpler.


   - *Yes, BatchCreator can be included in AsyncSinkWriterConfiguration.
   However, my initial reasoning for keeping it separate was that BatchCreator
   is not just a configuration parameter—it's a functional component that
   directly influences how records are batched. This makes it more of a
   behavioral dependency rather than a passive setting. That said, if
   incorporating it into AsyncSinkWriterConfiguration aligns better with the
   overall design, it is a nit change and I'm open to updating the approach
   accordingly. Do let me know your thoughts, as RateLimitingStartegy is also
   a part of it.*


 I want to leverage some concept from FLIP-284 that we have previously
seen in DynamoDbAsyncSink, do you think we can also customize the trigger
so we can also add a "readyToFlush()" method or something similar and use
it in `nonBlockingFlush` methods instead of forcing the trigger on only
total record count or buffer size. I see this useful in your case as well
because if we have buffered 100 requests from 100 different partitions then
we will keep triggering and emitting batches of 1 for 100 times (unless
maxTimeInBufferMS is used ofc) so you can use the custom trigger for
minimum buffered records per partition. This might be a bit tricky because
in order to not affect the performance we probably will wrap the whole
buffer in the batchCreator to maintain calculations like size in bytes or
per partition record count in the batch creator making it more
"bufferHandler" rather than a "batchCreator", let me know your thoughts
about this


   -

   *You bring up a valid concern about optimizing flush triggers to prevent
   inefficient batch emissions when multiple partitions exist, particularly in
   sinks like DynamoDB and Cassandra. IMO flushing (when to send records) and
   batching (how to group records) should remain separate concerns, as they
   serve distinct purposes.*

   *To structure this properly, I’ll use FLIP-284 as the foundation:*
   *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush*

   *The BufferFlushTrigger should be responsible for determining when the
   buffer is ready to flush, based on configurable conditions such as:*
   - *Total record count (batch size threshold).*
      - *Buffer size in bytes.*
      - *Time-based constraints (maxTimeInBufferMS), or any other custom
      logic a sink may require.*

   *🔹 BatchCreator - Decides How to Form a Batch*

   *Once BufferFlushTrigger determines that a flush should occur,
   BatchCreator is responsible for reading the buffered requests and forming a
   batch based on partitioning rules.*
   - *BatchCreator does not decide when to flush—it only handles how
      records are grouped when a flush is triggered.*
      - *This separation allows partition-aware batching, ensuring that
      records from the same partition are grouped together before submission.*
   ------------------------------
   *🔹 Example: BatchCreator with Two Partitioning Keys**Scenario*
      - *batchSize = 20*
      - *The buffer contains records from two partitions.*
      - *Assume either (a) we flush when at least 10 records exist per
      partition.*
      - *We apply a simple flush strategy that triggers a flush when 20
      total records are buffered.*
   *How BatchCreator Forms the Batch*

   *Even though BufferFlushTrigger initiates the flush when 20 records are
   reached, BatchCreator must still decide how to structure the batch. It does
   so by:*
   1. *Reading the buffered records.*
      2. *Grouping them by partition key. (Find out the 10 batch)*
      3. *Creating a valid batch from these groups before submitting them
      downstream.*
   *What I mean is that, the buffered Requests should be there for both
   flushing and batching for optimized writes. *
   ------------------------------
   *The key takeaway is that by keeping these two interfaces separate, Sink
   Writers gain flexibility—they can mix and match different batching and
   flushing strategies. Since there is no tight coupling between them,
   different sinks can:*
      - *Implement a distinct batching strategy independent of the flushing
      mechanism.*
      - *Customize flush triggers without impacting how records are grouped
      into batches.*

I suppose these two serve complementary but different purposes, *hence they
can be implemented differently*.
What do you think?


On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Poorvank,
> Thanks for driving this, +1 (non-binding) for the FLIP in general. While I
> see some common grounds with FLIP-284 that I wrote, I prefer going with a
> new FLIP since it is driven by a current use case and is specific for
> tackling it. I have a couple of clarifying questions.
>
> 1- I assume we will not expose batch creators to users but only to sink
> implementers, am I correct?
>
> 2- It is nit: but I would prefer adding the creator to the
> `AsyncSinkWriterConfiguration` rather than overloading the constructor,
> there are some already implemented ways for instantiating the configs with
> defaults so would be simpler.
>
> 3- I want to leverage some concept from FLIP-284 that we have previously
> seen in DynamoDbAsyncSink, do you think we can also customize the trigger
> so we can also add a "readyToFlush()" method or something similar and use
> it in `nonBlockingFlush` methods instead of forcing the trigger on only
> total record count or buffer size. I see this useful in your case as well
> because if we have buffered 100 requests from 100 different partitions then
> we will keep triggering and emitting batches of 1 for 100 times (unless
> maxTimeInBufferMS is used ofc) so you can use the custom trigger for
> minimum buffered records per partition. This might be a bit tricky because
> in order to not affect the performance we probably will wrap the whole
> buffer in the batchCreator to maintain calculations like size in bytes or
> per partition record count in the batch creator making it more
> "bufferHandler" rather than a "batchCreator", let me know your thoughts
> about this.
>
>
> Best Regards
> Ahmed Hamdy
>
>
> On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia <puravbhat...@gmail.com>
> wrote:
>
> > Hey everyone,
> >
> > I’d like to propose adding a pluggable batching mechanism to
> > AsyncSinkWriter
> > <
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351
> > >
> >  to enable custom batch formation strategies.
> > Currently, batching is based on batch size and record count, but this
> > approach is suboptimal for sinks like Cassandra, which require
> > partition-aware batching. Specifically, batches should be formed so that
> > all requests within a batch belong to the same partition, ensuring more
> > efficient writes.
> >
> > The proposal introduces a minimal `BatchCreator` interface, enabling
> users
> > to define custom batching strategies while maintaining backward
> > compatibility with a default implementation.
> >
> > For full details, please refer to the proposal document
> > <
> >
> https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f
> > >
> > .
> > Associated Jira <https://issues.apache.org/jira/browse/FLINK-37298>
> >
> > Thanks,
> > Poorvank
> >
>

Reply via email to