Copilot commented on code in PR #5771:
URL: https://github.com/apache/ignite-3/pull/5771#discussion_r2077395790


##########
modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java:
##########
@@ -42,38 +43,54 @@ class StreamerBuffer<T> {
      *
      * @param item Item.
      */
-    synchronized void add(T item) {
-        if (closed) {
-            throw new IllegalStateException("Streamer is closed, can't add 
items.");
-        }
+    void add(T item) {
+        List<T> bufToFlush = null;
 
-        buf.add(item);
+        synchronized (this) {
+            if (closed) {
+                throw new IllegalStateException("Streamer is closed, can't add 
items.");
+            }
 
-        if (buf.size() >= capacity) {
-            flusher.accept(buf);
-            buf = new ArrayList<>(capacity);
+            buf.add(item);
+
+            if (buf.size() >= capacity) {
+                bufToFlush = buf;

Review Comment:
   In add(T item), when the buffer reaches its capacity, bufToFlush is assigned 
but never flushed outside the synchronized block, resulting in items not being 
processed. Consider adding a call to flushBuf(bufToFlush) after the 
synchronized block if bufToFlush is not null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to