FYI: Created FLIP-509.

Best Regards
Ahmed Hamdy


On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Poorvank,
> thanks for the feedback, I can see your point and I kinda agree, I would
> say, if you don't need the flushing trigger interface in your use case
> let's keep it out of the FLIP for now, Also let's wait for the feedback
> about that from other members.
> I will create a FLIP and assign it a number.
>
> Best Regards
> Ahmed Hamdy
>
>
> On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia <puravbhat...@gmail.com>
> wrote:
>
>> Hello Ahmed,
>> Thank you for your insights! In fact, FLIP-284
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable
>> >'s
>> proposal aligns so well with this approach, I have added it has the future
>> scope.
>> To answer your questions:
>>
>> I assume we will not expose batch creators to users but only to sink
>> implementers, am I correct?
>>
>>    - *Yes, that is correct. The BatchCreator interface is intended to be
>>    used only by sink implementers, not end users. Since the SinkWriter is
>>    responsible for defining how records are batched and written, only
>> those
>>    developing a custom sink would need access to this functionality.*
>>
>> It is nit: but I would prefer adding the creator to the
>> `AsyncSinkWriterConfiguration` rather than overloading the constructor,
>> there are some already implemented ways for instantiating the configs with
>> defaults so would be simpler.
>>
>>
>>    - *Yes, BatchCreator can be included in AsyncSinkWriterConfiguration.
>>    However, my initial reasoning for keeping it separate was that
>> BatchCreator
>>    is not just a configuration parameter—it's a functional component that
>>    directly influences how records are batched. This makes it more of a
>>    behavioral dependency rather than a passive setting. That said, if
>>    incorporating it into AsyncSinkWriterConfiguration aligns better with
>> the
>>    overall design, it is a nit change and I'm open to updating the
>> approach
>>    accordingly. Do let me know your thoughts, as RateLimitingStartegy is
>> also
>>    a part of it.*
>>
>>
>>  I want to leverage some concept from FLIP-284 that we have previously
>> seen in DynamoDbAsyncSink, do you think we can also customize the trigger
>> so we can also add a "readyToFlush()" method or something similar and use
>> it in `nonBlockingFlush` methods instead of forcing the trigger on only
>> total record count or buffer size. I see this useful in your case as well
>> because if we have buffered 100 requests from 100 different partitions
>> then
>> we will keep triggering and emitting batches of 1 for 100 times (unless
>> maxTimeInBufferMS is used ofc) so you can use the custom trigger for
>> minimum buffered records per partition. This might be a bit tricky because
>> in order to not affect the performance we probably will wrap the whole
>> buffer in the batchCreator to maintain calculations like size in bytes or
>> per partition record count in the batch creator making it more
>> "bufferHandler" rather than a "batchCreator", let me know your thoughts
>> about this
>>
>>
>>    -
>>
>>    *You bring up a valid concern about optimizing flush triggers to
>> prevent
>>    inefficient batch emissions when multiple partitions exist,
>> particularly in
>>    sinks like DynamoDB and Cassandra. IMO flushing (when to send records)
>> and
>>    batching (how to group records) should remain separate concerns, as
>> they
>>    serve distinct purposes.*
>>
>>    *To structure this properly, I’ll use FLIP-284 as the foundation:*
>>    *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush*
>>
>>    *The BufferFlushTrigger should be responsible for determining when the
>>    buffer is ready to flush, based on configurable conditions such as:*
>>    - *Total record count (batch size threshold).*
>>       - *Buffer size in bytes.*
>>       - *Time-based constraints (maxTimeInBufferMS), or any other custom
>>       logic a sink may require.*
>>
>>    *🔹 BatchCreator - Decides How to Form a Batch*
>>
>>    *Once BufferFlushTrigger determines that a flush should occur,
>>    BatchCreator is responsible for reading the buffered requests and
>> forming a
>>    batch based on partitioning rules.*
>>    - *BatchCreator does not decide when to flush—it only handles how
>>       records are grouped when a flush is triggered.*
>>       - *This separation allows partition-aware batching, ensuring that
>>       records from the same partition are grouped together before
>> submission.*
>>    ------------------------------
>>    *🔹 Example: BatchCreator with Two Partitioning Keys**Scenario*
>>       - *batchSize = 20*
>>       - *The buffer contains records from two partitions.*
>>       - *Assume either (a) we flush when at least 10 records exist per
>>       partition.*
>>       - *We apply a simple flush strategy that triggers a flush when 20
>>       total records are buffered.*
>>    *How BatchCreator Forms the Batch*
>>
>>    *Even though BufferFlushTrigger initiates the flush when 20 records are
>>    reached, BatchCreator must still decide how to structure the batch. It
>> does
>>    so by:*
>>    1. *Reading the buffered records.*
>>       2. *Grouping them by partition key. (Find out the 10 batch)*
>>       3. *Creating a valid batch from these groups before submitting them
>>       downstream.*
>>    *What I mean is that, the buffered Requests should be there for both
>>    flushing and batching for optimized writes. *
>>    ------------------------------
>>    *The key takeaway is that by keeping these two interfaces separate,
>> Sink
>>    Writers gain flexibility—they can mix and match different batching and
>>    flushing strategies. Since there is no tight coupling between them,
>>    different sinks can:*
>>       - *Implement a distinct batching strategy independent of the
>> flushing
>>       mechanism.*
>>       - *Customize flush triggers without impacting how records are
>> grouped
>>       into batches.*
>>
>> I suppose these two serve complementary but different purposes, *hence
>> they
>> can be implemented differently*.
>> What do you think?
>>
>>
>> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote:
>>
>> > Hi Poorvank,
>> > Thanks for driving this, +1 (non-binding) for the FLIP in general.
>> While I
>> > see some common grounds with FLIP-284 that I wrote, I prefer going with
>> a
>> > new FLIP since it is driven by a current use case and is specific for
>> > tackling it. I have a couple of clarifying questions.
>> >
>> > 1- I assume we will not expose batch creators to users but only to sink
>> > implementers, am I correct?
>> >
>> > 2- It is nit: but I would prefer adding the creator to the
>> > `AsyncSinkWriterConfiguration` rather than overloading the constructor,
>> > there are some already implemented ways for instantiating the configs
>> with
>> > defaults so would be simpler.
>> >
>> > 3- I want to leverage some concept from FLIP-284 that we have previously
>> > seen in DynamoDbAsyncSink, do you think we can also customize the
>> trigger
>> > so we can also add a "readyToFlush()" method or something similar and
>> use
>> > it in `nonBlockingFlush` methods instead of forcing the trigger on only
>> > total record count or buffer size. I see this useful in your case as
>> well
>> > because if we have buffered 100 requests from 100 different partitions
>> then
>> > we will keep triggering and emitting batches of 1 for 100 times (unless
>> > maxTimeInBufferMS is used ofc) so you can use the custom trigger for
>> > minimum buffered records per partition. This might be a bit tricky
>> because
>> > in order to not affect the performance we probably will wrap the whole
>> > buffer in the batchCreator to maintain calculations like size in bytes
>> or
>> > per partition record count in the batch creator making it more
>> > "bufferHandler" rather than a "batchCreator", let me know your thoughts
>> > about this.
>> >
>> >
>> > Best Regards
>> > Ahmed Hamdy
>> >
>> >
>> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia <puravbhat...@gmail.com>
>> > wrote:
>> >
>> > > Hey everyone,
>> > >
>> > > I’d like to propose adding a pluggable batching mechanism to
>> > > AsyncSinkWriter
>> > > <
>> > >
>> >
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351
>> > > >
>> > >  to enable custom batch formation strategies.
>> > > Currently, batching is based on batch size and record count, but this
>> > > approach is suboptimal for sinks like Cassandra, which require
>> > > partition-aware batching. Specifically, batches should be formed so
>> that
>> > > all requests within a batch belong to the same partition, ensuring
>> more
>> > > efficient writes.
>> > >
>> > > The proposal introduces a minimal `BatchCreator` interface, enabling
>> > users
>> > > to define custom batching strategies while maintaining backward
>> > > compatibility with a default implementation.
>> > >
>> > > For full details, please refer to the proposal document
>> > > <
>> > >
>> >
>> https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f
>> > > >
>> > > .
>> > > Associated Jira <https://issues.apache.org/jira/browse/FLINK-37298>
>> > >
>> > > Thanks,
>> > > Poorvank
>> > >
>> >
>>
>

Reply via email to