Hey everyone, I’d like to propose adding a pluggable batching mechanism to AsyncSinkWriter <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351> to enable custom batch formation strategies. Currently, batching is based on batch size and record count, but this approach is suboptimal for sinks like Cassandra, which require partition-aware batching. Specifically, batches should be formed so that all requests within a batch belong to the same partition, ensuring more efficient writes.
The proposal introduces a minimal `BatchCreator` interface, enabling users to define custom batching strategies while maintaining backward compatibility with a default implementation. For full details, please refer to the proposal document <https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f> . Associated Jira <https://issues.apache.org/jira/browse/FLINK-37298> Thanks, Poorvank