github-advanced-security[bot] commented on code in PR #19375:
URL: https://github.com/apache/druid/pull/19375#discussion_r3140145446
##########
processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java:
##########
@@ -222,21 +412,245 @@
Channels.writeFully(channel, compressed);
}
- for (File f : smoosher.getOutFiles()) {
+ for (ContainerWriter container : containers) {
+ final File f = container.file;
try (FileInputStream fis = new FileInputStream(f)) {
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
}
- // delete all the old 00000.smoosh
+ // delete all the old container files
DruidException.conditionalDefensive(
f.delete(),
- "Failed to delete temporary file[%s]",
+ "Failed to delete temporary container file[%s]",
f
);
}
}
}
+
+ private List<SegmentFileContainerMetadata> buildContainerMetadata()
+ {
+ final List<SegmentFileContainerMetadata> result = new
ArrayList<>(containers.size());
+ long offset = 0;
+ for (ContainerWriter container : containers) {
+ final long length = container.file.length();
+ result.add(new SegmentFileContainerMetadata(offset, length));
+ offset += length;
+ }
+ return result;
+ }
+
+ /**
+ * Ensure that {@link #currentContainer} is ready to accept {@code size}
bytes of a file belonging to {@code group}.
+ * Rolls the current container and starts a new one when:
+ * <ul>
+ * <li>there is no current container, or</li>
+ * <li>the current container is for a different group, or</li>
+ * <li>the current container cannot fit the incoming bytes within {@link
#maxContainerSize}.</li>
+ * </ul>
+ */
+ private void ensureContainer(@Nullable String group, long size) throws
IOException
+ {
+ if (currentContainer == null
+ || !Objects.equals(currentContainer.group, group)
+ || !currentContainer.canFit(size)) {
+ if (currentContainer != null) {
+ currentContainer.close();
+ }
+ currentContainer = openNewContainer(group);
+ containers.add(currentContainer);
+ }
+ }
+
+ private ContainerWriter openNewContainer(@Nullable String group) throws
IOException
+ {
+ FileUtils.mkdirp(baseDir);
+ final int fileNum = containers.size();
+ final File containerFile = new File(
+ baseDir,
+ StringUtils.format("%s-%05d.container", outputFileName, fileNum)
+ );
+ return new ContainerWriter(fileNum, containerFile, group,
maxContainerSize);
+ }
+
+ private SegmentFileChannel delegateChannel(final String name, final long
size) throws IOException
+ {
+ final String delegateName = nextDelegateFileName(name);
+ final File tmpFile = new File(baseDir, delegateName);
+ inProgressDelegateFiles.add(tmpFile);
+
+ return new SegmentFileChannel()
+ {
+ private final FileChannel channel = FileChannel.open(
+ tmpFile.toPath(),
+ StandardOpenOption.WRITE,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING
+ );
+
+ private long bytesWritten = 0;
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ return Ints.checkedCast(addBytes(channel.write(src)));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws
IOException
+ {
+ return addBytes(channel.write(srcs, offset, length));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException
+ {
+ return addBytes(channel.write(srcs));
+ }
+
+ private long addBytes(long n)
+ {
+ if (n > size - bytesWritten) {
+ throw new ISE(
+ "Wrote more bytes[%,d] than expected[%,d] for delegated
file[%s]",
+ bytesWritten + n, size, name
+ );
+ }
+ bytesWritten += n;
+ return n;
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return channel.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ channel.close();
+ completedDelegateFiles.add(tmpFile);
+ inProgressDelegateFiles.remove(tmpFile);
+ if (!writerCurrentlyInUse) {
+ mergeDelegatedFiles();
+ }
+ }
+ };
+ }
+
+ /**
+ * Move completed delegate temp files into containers by replaying them as
regular {@link #add} calls. Only called
+ * when no outer writer is currently holding the builder.
+ */
+ private void mergeDelegatedFiles() throws IOException
+ {
+ if (completedDelegateFiles.isEmpty()) {
+ return;
+ }
+ final List<File> toProcess = new ArrayList<>(completedDelegateFiles);
+ final Map<String, String> nameMap = new HashMap<>(delegateFileNameMap);
+ completedDelegateFiles.clear();
+ delegateFileNameMap.clear();
+ for (File file : toProcess) {
+ final String name = nameMap.get(file.getName());
+ add(name, file);
+ if (!file.delete()) {
+ LOG.warn("Unable to delete delegate file[%s]", file);
+ }
+ }
+ }
+
+ /**
+ * Generate a unique temp file name for a delegated nested write. Prefixed
with {@link #outputFileName} so that
+ * delegate files from a main builder and its externals (which share {@link
#baseDir}) cannot collide as main and
+ * external always have distinct output file names.
+ */
+ private String nextDelegateFileName(String name)
+ {
+ final String delegateName = StringUtils.format("%s-delegate-%d",
outputFileName, delegateFileCounter++);
+ delegateFileNameMap.put(delegateName, name);
+ return delegateName;
+ }
+
+ /**
+ * Low-level writer for a single container chunk file. One container holds
internal files from at most one group.
+ */
+ private static class ContainerWriter implements GatheringByteChannel
+ {
+ private final int fileNum;
+ private final File file;
+ @Nullable
+ private final String group;
+ private final long maxSize;
+ private final Closer closer = Closer.create();
+ private final GatheringByteChannel channel;
+ private long currOffset = 0;
+
+ ContainerWriter(int fileNum, File file, @Nullable String group, long
maxSize) throws IOException
+ {
+ this.fileNum = fileNum;
+ this.file = file;
+ this.group = group;
+ this.maxSize = maxSize;
+ final FileOutputStream outStream = closer.register(new
FileOutputStream(file));
Review Comment:
## CodeQL / Potential output resource leak
This FileOutputStream is not always closed on method exit.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11137)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]