Hey everyone, Before raising a vote thread, can anyone please help with copying the contents of the modified doc <https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.1zdhrsm5oaad> to this FLIP <https://cwiki.apache.org/confluence/display/FLINK/FLIP-509+Add+pluggable+Batching+for+Async+Sink> page (mentioned on the proposal page <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-Process> ).
Thank you. On Tue, Mar 4, 2025 at 9:00 AM Poorvank Bhatia <puravbhat...@gmail.com> wrote: > Hey devs, > > Thank you all for participating in the discussion! I've updated this doc > based on feedback. Please feel free to share any further thoughts. > > If there are no additional suggestions, I plan to initiate a vote within > the next few days. > > On Wed, Feb 26, 2025 at 9:00 PM Poorvank Bhatia <puravbhat...@gmail.com> > wrote: > >> Thanks Arvid. As discussed offline, I have made the changes to doc. >> >> >> 1. Gotten rid of the protected methods in the constructor. >> 2. Added a new constructor that takes in BatchCreator/BufferWrapper >> from the current one. >> 3. The new constructor defaults to the Default BatchCreator and >> BufferWrapper (mimicking the current behaviour) >> >> >> So: >> >> - Since the old constructor remains unchanged, Flink 1.x will >> continue using it without any issues. >> - Flink 1.x was compiled with the old constructor signature, so it >> will only look for and call that constructor. >> - Even though the class contains references >> to BatchCreator and BufferWrapper, they won’t be loaded if the constructor >> that references them is never used. >> - If Flink 1.x explicitly tries to call the new constructor (which >> includes BatchCreator and BufferWrapper), it will fail since those classes >> do not exist in Flink 1.x. But this will only happen when a connector >> essentially is using the BatchCreator or the new interfaces right, so >> what >> I assume is it should be the connectors responsibility to provide >> differentiation if it uses BatchCreator . As for Cassandra, we can provide >> 2 implementations one without Batch and other with Batch, so the one with >> Batch can check at runtime if the newer classes can be used. >> >> So the connectors that want to use customized batching would be dependent >> on 2.1. If it is used with (a previous flink runtime) a check can be added >> in the connector to make sure that BatchCreator is available. (simple check >> via reflection). >> Please have a look and let me know your thoughts. >> >> Hey Danny. >> In case you missed the last response, I have added a BufferWrapper and >> removed the hardcoded Dequeue check. Please have a look and let me know >> your thoughts. >> >> >> On Tue, Feb 25, 2025 at 2:10 PM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Poorvank, >>> >>> I don't have strong feelings about the actual choice of technology. >>> SPI is used rather heavily in Flink Table/SQL and is also used in >>> similar projects more and more. But SPI also uses reflection under the >>> hood, so it doesn't make a huge difference except from a usability >>> perspective. SPI has some tool support. IDEA, for example, allows you >>> to jump back and forth between the service definition and the >>> implementations. >>> >>> In any case, protected methods that are called in the ctor cannot be >>> overridden because at the point where the base object is initialized >>> the VMT doesn't contain references to methods of the subclasses. That >>> is a deliberate choice of the VM engineers to avoid cases where an >>> overridden method accesses fields that have not been initialized yet >>> because the subclass ctor wasn't invoked yet. >>> >>> Also I'm not sure if class loading would succeed if you have a method >>> that returns an unknown class (let's say you want to run your >>> connector on Flink 1.X, which was built against Flink 2.1). I'm >>> assuming it will fail. >>> >>> So we need other choices. I'm curious if annotations could be used (in >>> conjunction with reflection again). I'm guessing classloading will >>> also fail if an unknown annotation is attached or loading of the >>> annotation itself will fail if the referenced class cannot be loaded. >>> If it works somehow, we could say: >>> @BatchedWith(MyBatchCreator.class) >>> class MyConnector extends Async... >>> >>> Unfortunately, I don't have time to try out these ideas. Maybe you have. >>> >>> And in any case, if it gets too complicated, we can always forgo >>> backwards compatibility and require specific builds for Flink 2.1+. >>> This may also be the easier option in case the connector doesn't >>> perform too well without the BatchCreator anyways. >>> >>> Best, >>> >>> Arvid >>> >>> On Thu, Feb 20, 2025 at 6:56 PM Poorvank Bhatia <puravbhat...@gmail.com> >>> wrote: >>> > >>> > Hey Arvid, >>> > >>> > Thank you for your feedback and for taking the time to review this >>> > proposal. >>> > To answer your concerns: >>> > >>> > *Naming Suggestion: BatchCreationResult → Batch * >>> > This is a valid point, and I agree that BatchCreationResult is >>> essentially >>> > representing a Batch. The current name was chosen to reflect that it >>> > encapsulates metadata such as batch size and count, in addition to the >>> > entries. I’ll update the proposal to reflect this suggestion and >>> rename >>> > the class accordingly. >>> > >>> > *Compatibility Concerns*: You are on point regarding backward >>> compatibility >>> > and avoiding API breakages in connectors. >>> > Based on our discussion offline, I have removed the BatchCreator >>> interface >>> > from the constructor and it is not exposed in the public API of >>> > AsyncSinkWriter. >>> > Instead of requiring direct injection via the constructor, the >>> > implementation now uses protected factory methods >>> (createBatchCreator()), >>> > allowing subclasses to override the behavior without modifying the base >>> > API. >>> > If no subclass overrides the methods, Flink will continue using the >>> default >>> > SimpleBatchCreator and DequeBufferWrapper, maintaining full backward >>> > compatibility. >>> > You suggested using Java Reflection to dynamically instantiate >>> > BatchCreator. While reflection offers flexibility, I aimed to avoid it >>> > primarily due to performance overhead & guidelines >>> > < >>> https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-reflection >>> >. >>> > That said, since reflection would only be used during initialization, >>> the >>> > impact should be minimal. An alternative approach you suggested for >>> Service >>> > Provider Interface (SPI), which allows for dynamic discovery of >>> > implementations. However, it requires sink implementers to define a >>> > META-INF/services file, which I was trying to avoid for simplicity. >>> > If you think SPI is a better approach as compared to simple factor >>> methods >>> > i can add a simple loader: >>> > >>> > *private BatchCreator<RequestEntryT> loadBatchCreator() { * >>> > * ServiceLoader<BatchCreator> loader = >>> > ServiceLoader.load(BatchCreator.class); * >>> > * for (BatchCreator<RequestEntryT> creator : loader) { * >>> > * return creator; * >>> > * } * >>> > * return new SimpleBatchCreator<>(maxBatchSizeInBytes); * >>> > *} * >>> > >>> > Let me know what makes the most sense to you, and I can either replace >>> the >>> > factory methods with SPI loaders or keep them as they are. >>> > >>> > On Thu, Feb 20, 2025 at 10:26 PM Poorvank Bhatia < >>> puravbhat...@gmail.com> >>> > wrote: >>> > >>> > > Hey Danny, >>> > > >>> > > Thank you for the review. Your observation is absolutely valid, and >>> moving >>> > > away from *Deque<RequestEntryWrapper<RequestEntryT>> >>> > > bufferedRequestEntries* in favor of a more flexible abstraction makes >>> > > sense. >>> > > *Given the exposure of batch creators, Deque may not be the most >>> > > semantically appropriate or performant choice. To address this, a >>> more >>> > > adaptable buffering abstraction that better aligns with different >>> batching >>> > > strategies can be used.* >>> > > >>> > > *So I Updated the proposal and* >>> > > *Introduced BufferWrapper Interface:* >>> > > >>> > > - *Defines an abstraction for managing buffered request entries. * >>> > > - *Supports FIFO, priority-based inserts, and custom queuing >>> > > mechanisms. * >>> > > - *Provides essential operations like add, poll, peek, and >>> > > getBufferedState. * >>> > > >>> > > *Implemented Default DequeBufferWrapper * >>> > > >>> > > - *Uses ArrayDeque for efficient FIFO behavior while allowing >>> > > prioritized inserts. * >>> > > - *Maintains backward compatibility with minimal overhead. * >>> > > >>> > > *Modified AsyncSinkWriter to Use BufferWrapper Instead of Deque * >>> > > >>> > > - *bufferedRequestEntries is now of type >>> BufferWrapper<RequestEntryT>, >>> > > making the choice of buffer implementation flexible. * >>> > > - *A createBuffer() method initializes DequeBufferWrapper by >>> default. * >>> > > >>> > > *Updated BatchCreator and SimpleBatchCreator * >>> > > >>> > > - *Modified method signatures to work with >>> > > BufferWrapper<RequestEntryT> instead of >>> > > Deque<RequestEntryWrapper<RequestEntryT>>. * >>> > > - *Ensures better encapsulation and allows future optimizations in >>> > > buffering strategies.* >>> > > >>> > > I have added this interface and updated the doc. Please have a look >>> and >>> > > let me know if this makes sense. >>> > > >>> > > >>> > > On Thu, Feb 20, 2025 at 5:34 PM Arvid Heise >>> <ahe...@confluent.io.invalid> >>> > > wrote: >>> > > >>> > >> Hi Poorvank, >>> > >> >>> > >> thanks for putting this together. It's obvious to me that this is a >>> > >> good addition. I wish Ahmed could check if his proposals are >>> > >> compatible with yours, so we don't end up with two different ways to >>> > >> express the same thing. Ideally Ahmed's proposal could be >>> retrofitted >>> > >> to extend on yours. >>> > >> >>> > >> I have a small nit and a larger concern. The nit is to rename >>> > >> BatchCreationResult into Batch because that's what it is. >>> > >> >>> > >> My main concern is around compatibility. If the new interfaces end >>> up >>> > >> in the signature of the AsyncSinkWriter, we will not be able to run >>> a >>> > >> connector built on top of it with an older Flink version. Either we >>> > >> explicitly call it out or we find a way to avoid that scenario. The >>> > >> connector community had some negative experiences around evolving >>> > >> APIs. >>> > >> >>> > >> One way to avoid breaking compatibility is to provide add-ons where >>> > >> the interfaces are not baked into the AsyncSinkWriter's API. >>> Consider >>> > >> your `BatchCreator` interface. If it's just used within the >>> > >> AsyncSinkWriter (private field, private initialization), then we >>> could >>> > >> run the same connector on an older Flink version. In that case, >>> > >> batching just wouldn't be enabled. Of course, all of that only >>> matters >>> > >> if it's even viable to run the connector without batching. If not >>> > >> then, just make the connector version depend at least on Flink 2.1+ >>> or >>> > >> whenever this feature lands. >>> > >> >>> > >> Now the question arises how to initialize the BatchCreator. Your >>> > >> proposal injects it but maybe we can use some kind of discovery >>> > >> instead (e.g. SPI, convention, and/or annotations). Here is a naive >>> > >> sketch with reflection using a convention (I'd prefer SPI though). >>> > >> >>> > >> public AsyncSinkWriter( >>> > >> ElementConverter<InputT, RequestEntryT> elementConverter, >>> > >> WriterInitContext context, >>> > >> AsyncSinkWriterConfiguration configuration, >>> > >> Collection<BufferedRequestState<RequestEntryT>> states) { >>> > >> this(elementConverter, new Sink.InitContextWrapper(context), >>> > >> configuration, states); >>> > >> >>> > >> try { >>> > >> batchCreator = >>> > >> (BatchCreator) >>> > >> Class.forName(getClass().getName() + >>> > >> "BatchCreator").newInstance(); >>> > >> } catch (ClassNotFoundException e) { >>> > >> batchCreator = BatchCreator.DEFAULT; >>> > >> } catch (IllegalAccessException | InstantiationException e) { >>> > >> throw new RuntimeException(e); >>> > >> } >>> > >> } >>> > >> >>> > >> class CassandraBatchCreator implements BatchCreator { ... } >>> > >> >>> > >> Best, >>> > >> >>> > >> Arvid >>> > >> >>> > >> >>> > >> On Wed, Feb 19, 2025 at 11:07 PM Danny Cranmer < >>> dannycran...@apache.org> >>> > >> wrote: >>> > >> > >>> > >> > Hello Poorvank, >>> > >> > >>> > >> > Thanks for opening this discussion/FLIP. I am +1 for the idea, it >>> seems >>> > >> > like a great enhancement to the AsyncSink. >>> > >> > >>> > >> > Just one question/observation from me. We currently use a >>> "Deque<..> >>> > >> > bufferedRequestEntries". Now we are exposing these batch >>> creators; it >>> > >> might >>> > >> > not make sense to use a queue anymore. On one hand it does not >>> make as >>> > >> much >>> > >> > semantic sense because we could have frequent queue jumpers. On >>> the >>> > >> other >>> > >> > hand it might not be performant, since Deque lookups provide O(n) >>> > >> > performance. What do you think? This is an internal data >>> structure so we >>> > >> > could consider replacing it with something else, but I might just >>> be >>> > >> > prematurely optimising this. >>> > >> > >>> > >> > Thanks, >>> > >> > Danny >>> > >> > >>> > >> > On Thu, Feb 13, 2025 at 5:31 PM Ahmed Hamdy <hamdy10...@gmail.com >>> > >>> > >> wrote: >>> > >> > >>> > >> > > FYI: Created FLIP-509. >>> > >> > > >>> > >> > > Best Regards >>> > >> > > Ahmed Hamdy >>> > >> > > >>> > >> > > >>> > >> > > On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy <hamdy10...@gmail.com >>> > >>> > >> wrote: >>> > >> > > >>> > >> > > > Hi Poorvank, >>> > >> > > > thanks for the feedback, I can see your point and I kinda >>> agree, I >>> > >> would >>> > >> > > > say, if you don't need the flushing trigger interface in your >>> use >>> > >> case >>> > >> > > > let's keep it out of the FLIP for now, Also let's wait for the >>> > >> feedback >>> > >> > > > about that from other members. >>> > >> > > > I will create a FLIP and assign it a number. >>> > >> > > > >>> > >> > > > Best Regards >>> > >> > > > Ahmed Hamdy >>> > >> > > > >>> > >> > > > >>> > >> > > > On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia < >>> > >> puravbhat...@gmail.com> >>> > >> > > > wrote: >>> > >> > > > >>> > >> > > >> Hello Ahmed, >>> > >> > > >> Thank you for your insights! In fact, FLIP-284 >>> > >> > > >> < >>> > >> > > >> >>> > >> > > >>> > >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable >>> > >> > > >> >'s >>> > >> > > >> proposal aligns so well with this approach, I have added it >>> has the >>> > >> > > future >>> > >> > > >> scope. >>> > >> > > >> To answer your questions: >>> > >> > > >> >>> > >> > > >> I assume we will not expose batch creators to users but only >>> to >>> > >> sink >>> > >> > > >> implementers, am I correct? >>> > >> > > >> >>> > >> > > >> - *Yes, that is correct. The BatchCreator interface is >>> intended >>> > >> to be >>> > >> > > >> used only by sink implementers, not end users. Since the >>> > >> SinkWriter >>> > >> > > is >>> > >> > > >> responsible for defining how records are batched and >>> written, >>> > >> only >>> > >> > > >> those >>> > >> > > >> developing a custom sink would need access to this >>> > >> functionality.* >>> > >> > > >> >>> > >> > > >> It is nit: but I would prefer adding the creator to the >>> > >> > > >> `AsyncSinkWriterConfiguration` rather than overloading the >>> > >> constructor, >>> > >> > > >> there are some already implemented ways for instantiating the >>> > >> configs >>> > >> > > with >>> > >> > > >> defaults so would be simpler. >>> > >> > > >> >>> > >> > > >> >>> > >> > > >> - *Yes, BatchCreator can be included in >>> > >> AsyncSinkWriterConfiguration. >>> > >> > > >> However, my initial reasoning for keeping it separate was >>> that >>> > >> > > >> BatchCreator >>> > >> > > >> is not just a configuration parameter—it's a functional >>> > >> component >>> > >> > > that >>> > >> > > >> directly influences how records are batched. This makes >>> it more >>> > >> of a >>> > >> > > >> behavioral dependency rather than a passive setting. That >>> said, >>> > >> if >>> > >> > > >> incorporating it into AsyncSinkWriterConfiguration aligns >>> > >> better with >>> > >> > > >> the >>> > >> > > >> overall design, it is a nit change and I'm open to >>> updating the >>> > >> > > >> approach >>> > >> > > >> accordingly. Do let me know your thoughts, as >>> > >> RateLimitingStartegy is >>> > >> > > >> also >>> > >> > > >> a part of it.* >>> > >> > > >> >>> > >> > > >> >>> > >> > > >> I want to leverage some concept from FLIP-284 that we have >>> > >> previously >>> > >> > > >> seen in DynamoDbAsyncSink, do you think we can also >>> customize the >>> > >> > > trigger >>> > >> > > >> so we can also add a "readyToFlush()" method or something >>> similar >>> > >> and >>> > >> > > use >>> > >> > > >> it in `nonBlockingFlush` methods instead of forcing the >>> trigger on >>> > >> only >>> > >> > > >> total record count or buffer size. I see this useful in your >>> case >>> > >> as >>> > >> > > well >>> > >> > > >> because if we have buffered 100 requests from 100 different >>> > >> partitions >>> > >> > > >> then >>> > >> > > >> we will keep triggering and emitting batches of 1 for 100 >>> times >>> > >> (unless >>> > >> > > >> maxTimeInBufferMS is used ofc) so you can use the custom >>> trigger >>> > >> for >>> > >> > > >> minimum buffered records per partition. This might be a bit >>> tricky >>> > >> > > because >>> > >> > > >> in order to not affect the performance we probably will wrap >>> the >>> > >> whole >>> > >> > > >> buffer in the batchCreator to maintain calculations like >>> size in >>> > >> bytes >>> > >> > > or >>> > >> > > >> per partition record count in the batch creator making it >>> more >>> > >> > > >> "bufferHandler" rather than a "batchCreator", let me know >>> your >>> > >> thoughts >>> > >> > > >> about this >>> > >> > > >> >>> > >> > > >> >>> > >> > > >> - >>> > >> > > >> >>> > >> > > >> *You bring up a valid concern about optimizing flush >>> triggers to >>> > >> > > >> prevent >>> > >> > > >> inefficient batch emissions when multiple partitions >>> exist, >>> > >> > > >> particularly in >>> > >> > > >> sinks like DynamoDB and Cassandra. IMO flushing (when to >>> send >>> > >> > > records) >>> > >> > > >> and >>> > >> > > >> batching (how to group records) should remain separate >>> > >> concerns, as >>> > >> > > >> they >>> > >> > > >> serve distinct purposes.* >>> > >> > > >> >>> > >> > > >> *To structure this properly, I’ll use FLIP-284 as the >>> > >> foundation:* >>> > >> > > >> *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to >>> Flush* >>> > >> > > >> >>> > >> > > >> *The BufferFlushTrigger should be responsible for >>> determining >>> > >> when >>> > >> > > the >>> > >> > > >> buffer is ready to flush, based on configurable >>> conditions such >>> > >> as:* >>> > >> > > >> - *Total record count (batch size threshold).* >>> > >> > > >> - *Buffer size in bytes.* >>> > >> > > >> - *Time-based constraints (maxTimeInBufferMS), or any >>> other >>> > >> custom >>> > >> > > >> logic a sink may require.* >>> > >> > > >> >>> > >> > > >> *🔹 BatchCreator - Decides How to Form a Batch* >>> > >> > > >> >>> > >> > > >> *Once BufferFlushTrigger determines that a flush should >>> occur, >>> > >> > > >> BatchCreator is responsible for reading the buffered >>> requests >>> > >> and >>> > >> > > >> forming a >>> > >> > > >> batch based on partitioning rules.* >>> > >> > > >> - *BatchCreator does not decide when to flush—it only >>> handles >>> > >> how >>> > >> > > >> records are grouped when a flush is triggered.* >>> > >> > > >> - *This separation allows partition-aware batching, >>> ensuring >>> > >> that >>> > >> > > >> records from the same partition are grouped together >>> before >>> > >> > > >> submission.* >>> > >> > > >> ------------------------------ >>> > >> > > >> *🔹 Example: BatchCreator with Two Partitioning >>> Keys**Scenario* >>> > >> > > >> - *batchSize = 20* >>> > >> > > >> - *The buffer contains records from two partitions.* >>> > >> > > >> - *Assume either (a) we flush when at least 10 records >>> exist >>> > >> per >>> > >> > > >> partition.* >>> > >> > > >> - *We apply a simple flush strategy that triggers a >>> flush >>> > >> when 20 >>> > >> > > >> total records are buffered.* >>> > >> > > >> *How BatchCreator Forms the Batch* >>> > >> > > >> >>> > >> > > >> *Even though BufferFlushTrigger initiates the flush when >>> 20 >>> > >> records >>> > >> > > are >>> > >> > > >> reached, BatchCreator must still decide how to structure >>> the >>> > >> batch. >>> > >> > > It >>> > >> > > >> does >>> > >> > > >> so by:* >>> > >> > > >> 1. *Reading the buffered records.* >>> > >> > > >> 2. *Grouping them by partition key. (Find out the 10 >>> batch)* >>> > >> > > >> 3. *Creating a valid batch from these groups before >>> > >> submitting >>> > >> > > them >>> > >> > > >> downstream.* >>> > >> > > >> *What I mean is that, the buffered Requests should be >>> there for >>> > >> both >>> > >> > > >> flushing and batching for optimized writes. * >>> > >> > > >> ------------------------------ >>> > >> > > >> *The key takeaway is that by keeping these two interfaces >>> > >> separate, >>> > >> > > >> Sink >>> > >> > > >> Writers gain flexibility—they can mix and match different >>> > >> batching >>> > >> > > and >>> > >> > > >> flushing strategies. Since there is no tight coupling >>> between >>> > >> them, >>> > >> > > >> different sinks can:* >>> > >> > > >> - *Implement a distinct batching strategy independent >>> of the >>> > >> > > >> flushing >>> > >> > > >> mechanism.* >>> > >> > > >> - *Customize flush triggers without impacting how >>> records are >>> > >> > > >> grouped >>> > >> > > >> into batches.* >>> > >> > > >> >>> > >> > > >> I suppose these two serve complementary but different >>> purposes, >>> > >> *hence >>> > >> > > >> they >>> > >> > > >> can be implemented differently*. >>> > >> > > >> What do you think? >>> > >> > > >> >>> > >> > > >> >>> > >> > > >> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy < >>> hamdy10...@gmail.com> >>> > >> > > wrote: >>> > >> > > >> >>> > >> > > >> > Hi Poorvank, >>> > >> > > >> > Thanks for driving this, +1 (non-binding) for the FLIP in >>> > >> general. >>> > >> > > >> While I >>> > >> > > >> > see some common grounds with FLIP-284 that I wrote, I >>> prefer >>> > >> going >>> > >> > > with >>> > >> > > >> a >>> > >> > > >> > new FLIP since it is driven by a current use case and is >>> > >> specific for >>> > >> > > >> > tackling it. I have a couple of clarifying questions. >>> > >> > > >> > >>> > >> > > >> > 1- I assume we will not expose batch creators to users but >>> only >>> > >> to >>> > >> > > sink >>> > >> > > >> > implementers, am I correct? >>> > >> > > >> > >>> > >> > > >> > 2- It is nit: but I would prefer adding the creator to the >>> > >> > > >> > `AsyncSinkWriterConfiguration` rather than overloading the >>> > >> > > constructor, >>> > >> > > >> > there are some already implemented ways for instantiating >>> the >>> > >> configs >>> > >> > > >> with >>> > >> > > >> > defaults so would be simpler. >>> > >> > > >> > >>> > >> > > >> > 3- I want to leverage some concept from FLIP-284 that we >>> have >>> > >> > > previously >>> > >> > > >> > seen in DynamoDbAsyncSink, do you think we can also >>> customize the >>> > >> > > >> trigger >>> > >> > > >> > so we can also add a "readyToFlush()" method or something >>> > >> similar and >>> > >> > > >> use >>> > >> > > >> > it in `nonBlockingFlush` methods instead of forcing the >>> trigger >>> > >> on >>> > >> > > only >>> > >> > > >> > total record count or buffer size. I see this useful in >>> your >>> > >> case as >>> > >> > > >> well >>> > >> > > >> > because if we have buffered 100 requests from 100 different >>> > >> partitions >>> > >> > > >> then >>> > >> > > >> > we will keep triggering and emitting batches of 1 for 100 >>> times >>> > >> > > (unless >>> > >> > > >> > maxTimeInBufferMS is used ofc) so you can use the custom >>> trigger >>> > >> for >>> > >> > > >> > minimum buffered records per partition. This might be a bit >>> > >> tricky >>> > >> > > >> because >>> > >> > > >> > in order to not affect the performance we probably will >>> wrap the >>> > >> whole >>> > >> > > >> > buffer in the batchCreator to maintain calculations like >>> size in >>> > >> bytes >>> > >> > > >> or >>> > >> > > >> > per partition record count in the batch creator making it >>> more >>> > >> > > >> > "bufferHandler" rather than a "batchCreator", let me know >>> your >>> > >> > > thoughts >>> > >> > > >> > about this. >>> > >> > > >> > >>> > >> > > >> > >>> > >> > > >> > Best Regards >>> > >> > > >> > Ahmed Hamdy >>> > >> > > >> > >>> > >> > > >> > >>> > >> > > >> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia < >>> > >> puravbhat...@gmail.com >>> > >> > > > >>> > >> > > >> > wrote: >>> > >> > > >> > >>> > >> > > >> > > Hey everyone, >>> > >> > > >> > > >>> > >> > > >> > > I’d like to propose adding a pluggable batching >>> mechanism to >>> > >> > > >> > > AsyncSinkWriter >>> > >> > > >> > > < >>> > >> > > >> > > >>> > >> > > >> > >>> > >> > > >> >>> > >> > > >>> > >> >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351 >>> > >> > > >> > > > >>> > >> > > >> > > to enable custom batch formation strategies. >>> > >> > > >> > > Currently, batching is based on batch size and record >>> count, >>> > >> but >>> > >> > > this >>> > >> > > >> > > approach is suboptimal for sinks like Cassandra, which >>> require >>> > >> > > >> > > partition-aware batching. Specifically, batches should be >>> > >> formed so >>> > >> > > >> that >>> > >> > > >> > > all requests within a batch belong to the same partition, >>> > >> ensuring >>> > >> > > >> more >>> > >> > > >> > > efficient writes. >>> > >> > > >> > > >>> > >> > > >> > > The proposal introduces a minimal `BatchCreator` >>> interface, >>> > >> enabling >>> > >> > > >> > users >>> > >> > > >> > > to define custom batching strategies while maintaining >>> backward >>> > >> > > >> > > compatibility with a default implementation. >>> > >> > > >> > > >>> > >> > > >> > > For full details, please refer to the proposal document >>> > >> > > >> > > < >>> > >> > > >> > > >>> > >> > > >> > >>> > >> > > >> >>> > >> > > >>> > >> >>> https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f >>> > >> > > >> > > > >>> > >> > > >> > > . >>> > >> > > >> > > Associated Jira < >>> > >> https://issues.apache.org/jira/browse/FLINK-37298> >>> > >> > > >> > > >>> > >> > > >> > > Thanks, >>> > >> > > >> > > Poorvank >>> > >> > > >> > > >>> > >> > > >> > >>> > >> > > >> >>> > >> > > > >>> > >> > > >>> > >> >>> > > >>> >>