Copilot commented on code in PR #5771: URL: https://github.com/apache/ignite-3/pull/5771#discussion_r2077395790
########## modules/core/src/main/java/org/apache/ignite/internal/streamer/StreamerBuffer.java: ########## @@ -42,38 +43,54 @@ class StreamerBuffer<T> { * * @param item Item. */ - synchronized void add(T item) { - if (closed) { - throw new IllegalStateException("Streamer is closed, can't add items."); - } + void add(T item) { + List<T> bufToFlush = null; - buf.add(item); + synchronized (this) { + if (closed) { + throw new IllegalStateException("Streamer is closed, can't add items."); + } - if (buf.size() >= capacity) { - flusher.accept(buf); - buf = new ArrayList<>(capacity); + buf.add(item); + + if (buf.size() >= capacity) { + bufToFlush = buf; Review Comment: In add(T item), when the buffer reaches its capacity, bufToFlush is assigned but never flushed outside the synchronized block, resulting in items not being processed. Consider adding a call to flushBuf(bufToFlush) after the synchronized block if bufToFlush is not null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org