Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/11805#discussion_r57956249
--- Diff:
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala
---
@@ -67,33 +73,41 @@ class ByteArrayChunkOutputStream(chunkSize: Int)
extends OutputStream {
@inline
private def allocateNewChunkIfNeeded(): Unit = {
+ require(!toChunkedByteBufferWasCalled, "cannot write after
toChunkedByteBuffer() is called")
if (position == chunkSize) {
- chunks += new Array[Byte](chunkSize)
+ chunks += allocator(chunkSize)
lastChunkIndex += 1
position = 0
}
}
- def toArrays: Array[Array[Byte]] = {
+ def toChunkedByteBuffer: ChunkedByteBuffer = {
+ require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only
be called once")
+ toChunkedByteBufferWasCalled = true
if (lastChunkIndex == -1) {
- new Array[Array[Byte]](0)
+ new ChunkedByteBuffer(Array.empty[ByteBuffer])
} else {
// Copy the first n-1 chunks to the output, and then create an array
that fits the last chunk.
// An alternative would have been returning an array of ByteBuffers,
with the last buffer
// bounded to only the last chunk's position. However, given our use
case in Spark (to put
// the chunks in block manager), only limiting the view bound of the
buffer would still
// require the block manager to store the whole chunk.
- val ret = new Array[Array[Byte]](chunks.size)
+ val ret = new Array[ByteBuffer](chunks.size)
for (i <- 0 until chunks.size - 1) {
ret(i) = chunks(i)
+ ret(i).flip()
}
if (position == chunkSize) {
ret(lastChunkIndex) = chunks(lastChunkIndex)
+ ret(lastChunkIndex).flip()
} else {
- ret(lastChunkIndex) = new Array[Byte](position)
- System.arraycopy(chunks(lastChunkIndex), 0, ret(lastChunkIndex),
0, position)
+ ret(lastChunkIndex) = allocator(position)
+ chunks(lastChunkIndex).flip()
+ ret(lastChunkIndex).put(chunks(lastChunkIndex))
+ ret(lastChunkIndex).flip()
+ StorageUtils.dispose(chunks(lastChunkIndex))
}
- ret
+ new ChunkedByteBuffer(ret)
--- End diff --
indent
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]