vahmed-hamdy commented on code in PR #26274: URL: https://github.com/apache/flink/pull/26274#discussion_r1988960736
########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferWrapper.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.Collection; + +/** + * A flexible wrapper interface for managing buffered request entries in an async sink. This allows + * sink implementations to define and optimize their own data structures for request buffering. + * + * @param <RequestEntryT> The type of request entries being buffered. + */ +@PublicEvolving Review Comment: Same as above ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BatchCreator.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo; + +import java.io.Serializable; +import java.util.Deque; + +/** + * A pluggable interface for forming batches of request entries from a buffer. Implementations + * control how many entries are grouped together and in what manner before sending them downstream. + * + * <p>The {@code AsyncSinkWriter} (or similar sink component) calls {@link + * #createNextBatch(RequestInfo, BufferWrapper)} (RequestInfo, Deque)} when it decides to flush or + * otherwise gather a new batch of elements. For instance, a batch creator might limit the batch by + * the number of elements, total payload size, or any custom partition-based strategy. + * + * @param <RequestEntryT> the type of the request entries to be batched + */ +@PublicEvolving Review Comment: Same as Batch ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/SimpleBatchCreator.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * A simple implementation of {@link BatchCreator} that forms a batch by taking up to {@code + * requestInfo.getBatchSize()} entries from the head of the buffer, so long as the cumulative size + * in bytes does not exceed the configured maximum. + * + * <p>This strategy stops gathering entries from the buffer as soon as adding the next entry would + * exceed the {@code maxBatchSizeInBytes}, or once it has collected {@code + * requestInfo.getBatchSize()} entries, whichever occurs first. + * + * @param <RequestEntryT> the type of request entries managed by this batch creator + */ +@PublicEvolving +public class SimpleBatchCreator<RequestEntryT extends Serializable> + implements BatchCreator<RequestEntryT> { + + /** The maximum total byte size allowed for a single batch. */ + private final long maxBatchSizeInBytes; + + /** + * Constructs a {@code SimpleBatchCreator} with the specified maximum batch size in bytes. + * + * @param maxBatchSizeInBytes the maximum cumulative size in bytes allowed for any single batch + */ + private SimpleBatchCreator(long maxBatchSizeInBytes) { + this.maxBatchSizeInBytes = maxBatchSizeInBytes; Review Comment: nit: ```suggestion this.maxBatchSizeInBytes = Preconditions.checkArgument(maxBatchSizeInBytes > 0); ``` Also can test that ########## flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java: ########## @@ -721,6 +721,27 @@ void testRestoreFromMultipleStates() throws IOException { .containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6); } + /** Simple assertion to make sure that {@link DequeBufferWrapper} is the default buffer. */ + @Test + public void testUseCorrectBufferWrapperImplementation() { + AsyncSinkWriterImpl initialSinkWriter = + new AsyncSinkWriterImplBuilder().context(sinkInitContext).build(); + + assertThat(initialSinkWriter.getBufferedRequestEntries()) Review Comment: We don't test using default when passing nulls? ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/Batch.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.List; + +/** + * A container for the result of creating a batch of request entries, including: + * + * <ul> + * <li>The actual list of entries forming the batch + * <li>The total size in bytes of those entries + * <li>The total number of entries in the batch + * </ul> + * + * <p>Instances of this class are typically created by a {@link BatchCreator} to summarize which + * entries have been selected for sending downstream and to provide any relevant metrics for + * tracking, such as the byte size or the record count. + * + * @param <RequestEntryT> the type of request entry in this batch + */ +@PublicEvolving Review Comment: I am not sure if this fits better as @Internal or PublicEvolving? It is not intended to be exposed to public users ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/Batch.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.sink.writer; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.List; + +/** + * A container for the result of creating a batch of request entries, including: + * + * <ul> + * <li>The actual list of entries forming the batch + * <li>The total size in bytes of those entries + * <li>The total number of entries in the batch + * </ul> + * + * <p>Instances of this class are typically created by a {@link BatchCreator} to summarize which + * entries have been selected for sending downstream and to provide any relevant metrics for + * tracking, such as the byte size or the record count. + * + * @param <RequestEntryT> the type of request entry in this batch + */ +@PublicEvolving Review Comment: I am not sure if this fits better as `@Internal` or PublicEvolving? It is not intended to be exposed to public users -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org