vahmed-hamdy commented on code in PR #26274:
URL: https://github.com/apache/flink/pull/26274#discussion_r1988960736


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferWrapper.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * A flexible wrapper interface for managing buffered request entries in an 
async sink. This allows
+ * sink implementations to define and optimize their own data structures for 
request buffering.
+ *
+ * @param <RequestEntryT> The type of request entries being buffered.
+ */
+@PublicEvolving

Review Comment:
   Same as above



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BatchCreator.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+
+import java.io.Serializable;
+import java.util.Deque;
+
+/**
+ * A pluggable interface for forming batches of request entries from a buffer. 
Implementations
+ * control how many entries are grouped together and in what manner before 
sending them downstream.
+ *
+ * <p>The {@code AsyncSinkWriter} (or similar sink component) calls {@link
+ * #createNextBatch(RequestInfo, BufferWrapper)} (RequestInfo, Deque)} when it 
decides to flush or
+ * otherwise gather a new batch of elements. For instance, a batch creator 
might limit the batch by
+ * the number of elements, total payload size, or any custom partition-based 
strategy.
+ *
+ * @param <RequestEntryT> the type of the request entries to be batched
+ */
+@PublicEvolving

Review Comment:
   Same as Batch



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/SimpleBatchCreator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A simple implementation of {@link BatchCreator} that forms a batch by 
taking up to {@code
+ * requestInfo.getBatchSize()} entries from the head of the buffer, so long as 
the cumulative size
+ * in bytes does not exceed the configured maximum.
+ *
+ * <p>This strategy stops gathering entries from the buffer as soon as adding 
the next entry would
+ * exceed the {@code maxBatchSizeInBytes}, or once it has collected {@code
+ * requestInfo.getBatchSize()} entries, whichever occurs first.
+ *
+ * @param <RequestEntryT> the type of request entries managed by this batch 
creator
+ */
+@PublicEvolving
+public class SimpleBatchCreator<RequestEntryT extends Serializable>
+        implements BatchCreator<RequestEntryT> {
+
+    /** The maximum total byte size allowed for a single batch. */
+    private final long maxBatchSizeInBytes;
+
+    /**
+     * Constructs a {@code SimpleBatchCreator} with the specified maximum 
batch size in bytes.
+     *
+     * @param maxBatchSizeInBytes the maximum cumulative size in bytes allowed 
for any single batch
+     */
+    private SimpleBatchCreator(long maxBatchSizeInBytes) {
+        this.maxBatchSizeInBytes = maxBatchSizeInBytes;

Review Comment:
   nit: 
   ```suggestion
           this.maxBatchSizeInBytes = 
Preconditions.checkArgument(maxBatchSizeInBytes > 0);
   ```
   
   Also can test that



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java:
##########
@@ -721,6 +721,27 @@ void testRestoreFromMultipleStates() throws IOException {
                 .containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
     }
 
+    /** Simple assertion to make sure that {@link DequeBufferWrapper} is the 
default buffer. */
+    @Test
+    public void testUseCorrectBufferWrapperImplementation() {
+        AsyncSinkWriterImpl initialSinkWriter =
+                new 
AsyncSinkWriterImplBuilder().context(sinkInitContext).build();
+
+        assertThat(initialSinkWriter.getBufferedRequestEntries())

Review Comment:
   We don't test using default when passing nulls? 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/Batch.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A container for the result of creating a batch of request entries, 
including:
+ *
+ * <ul>
+ *   <li>The actual list of entries forming the batch
+ *   <li>The total size in bytes of those entries
+ *   <li>The total number of entries in the batch
+ * </ul>
+ *
+ * <p>Instances of this class are typically created by a {@link BatchCreator} 
to summarize which
+ * entries have been selected for sending downstream and to provide any 
relevant metrics for
+ * tracking, such as the byte size or the record count.
+ *
+ * @param <RequestEntryT> the type of request entry in this batch
+ */
+@PublicEvolving

Review Comment:
   I am not sure if this fits better as @Internal or PublicEvolving? It is not 
intended to be exposed to public users



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/Batch.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A container for the result of creating a batch of request entries, 
including:
+ *
+ * <ul>
+ *   <li>The actual list of entries forming the batch
+ *   <li>The total size in bytes of those entries
+ *   <li>The total number of entries in the batch
+ * </ul>
+ *
+ * <p>Instances of this class are typically created by a {@link BatchCreator} 
to summarize which
+ * entries have been selected for sending downstream and to provide any 
relevant metrics for
+ * tracking, such as the byte size or the record count.
+ *
+ * @param <RequestEntryT> the type of request entry in this batch
+ */
+@PublicEvolving

Review Comment:
   I am not sure if this fits better as `@Internal` or PublicEvolving? It is 
not intended to be exposed to public users



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to