garydgregory commented on code in PR #748: URL: https://github.com/apache/commons-io/pull/748#discussion_r2094595734
########## src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java: ########## @@ -124,13 +136,124 @@ public void testAvailableAfterOpen(final String inputData) throws IOException { public void testBufferedReads(final String inputData) throws IOException { final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue)); - QueueOutputStream outputStream = new QueueOutputStream(queue)) { + QueueOutputStream outputStream = new QueueOutputStream(queue)) { outputStream.write(inputData.getBytes(StandardCharsets.UTF_8)); final String actualData = IOUtils.toString(inputStream, StandardCharsets.UTF_8); assertEquals(inputData, actualData); } } + @ParameterizedTest(name = "inputData={0}") + @MethodSource("inputData") + public void testReadLineByLineQueue(final String inputData) throws IOException { + final String[] lines = inputData.split("\n"); + final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); + try (QueueInputStream inputStream = QueueInputStream.builder() + .setBlockingQueue(queue) + .setTimeout(Duration.ofHours(1)) + .get(); + QueueOutputStream outputStream = inputStream.newQueueOutputStream()) { + + doTestReadLineByLine(inputData, inputStream, outputStream); + } + } + + @ParameterizedTest(name = "inputData={0}") + @MethodSource("inputData") + public void testReadLineByLineFile(final String inputData) throws IOException { + final Path tempFile = Files.createTempFile(getClass().getSimpleName(), ".txt"); + try (InputStream inputStream = Files.newInputStream(tempFile); + OutputStream outputStream = Files.newOutputStream(tempFile)) { + + doTestReadLineByLine(inputData, inputStream, outputStream); + } finally { + Files.delete(tempFile); + } + } + + private void doTestReadLineByLine(final String inputData, final InputStream inputStream, final OutputStream outputStream) throws IOException { + final String[] lines = inputData.split("\n"); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8))) { + for (String line : lines) { + outputStream.write(line.getBytes(UTF_8)); + outputStream.write('\n'); + + final String actualLine = reader.readLine(); + assertEquals(line, actualLine); + } + } + } + + @TestFactory + public DynamicTest[] bulkReadErrorHandlingTests() { + final QueueInputStream queueInputStream = new QueueInputStream(); + return new DynamicTest[] { + dynamicTest("Offset too big", () -> + assertThrows(IndexOutOfBoundsException.class, () -> + queueInputStream.read(EMPTY_BYTE_ARRAY, 1, 0))), + + dynamicTest("Offset negative", () -> + assertThrows(IndexOutOfBoundsException.class, () -> + queueInputStream.read(EMPTY_BYTE_ARRAY, -1, 0))), + + dynamicTest("Length too big", () -> + assertThrows(IndexOutOfBoundsException.class, () -> + queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 1))), + + dynamicTest("Length negative", () -> + assertThrows(IndexOutOfBoundsException.class, () -> + queueInputStream.read(EMPTY_BYTE_ARRAY, 0, -1))), + }; + } + + @Test + public void testBulkReadZeroLength() { + final QueueInputStream queueInputStream = new QueueInputStream(); + final int read = queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 0); + assertEquals(0, read); + } + + @ParameterizedTest(name = "inputData={0}") + @MethodSource("inputData") + public void testBulkReadWaiting(final String inputData) throws IOException { + assumeTrue(!inputData.isEmpty()); + + final CountDownLatch onPollLatch = new CountDownLatch(1); + final CountDownLatch afterWriteLatch = new CountDownLatch(1); + final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>() { + @Override + public Integer poll(final long timeout, final TimeUnit unit) throws InterruptedException { + onPollLatch.countDown(); + afterWriteLatch.await(); + return super.poll(timeout, unit); + } + }; + + // Simulate scenario where there is not data immediately available when bulk reading and QueueInputStream has to + // wait. + try (QueueInputStream queueInputStream = QueueInputStream.builder() + .setBlockingQueue(queue) + .setTimeout(Duration.ofHours(1)) + .get()) { + final QueueOutputStream queueOutputStream = queueInputStream.newQueueOutputStream(); + CompletableFuture.runAsync(() -> { + try { + onPollLatch.await(); + queueOutputStream.write(inputData.getBytes(StandardCharsets.UTF_8)); + afterWriteLatch.countDown(); + } catch (final Exception e) { + throw new RuntimeException(e); Review Comment: Hello @maxxedev Unless you expect this exception for the test to pass (if yes, then please add a `// comment`), the test should be clearer and call JUnit's `fail(...)` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@commons.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org