This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 506892d953 Make thread local variable static to avoid the memory
leaking issue (#12242)
506892d953 is described below
commit 506892d953a2d5fce754412d1d1167c93f75081f
Author: Xuanyi Li <[email protected]>
AuthorDate: Thu Feb 8 08:43:18 2024 -0800
Make thread local variable static to avoid the memory leaking issue (#12242)
---
.../readers/forward/VarByteChunkSVForwardIndexReader.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
index 9c9f9ce5f1..585d48d7ea 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
@@ -41,7 +41,7 @@ public final class VarByteChunkSVForwardIndexReader extends
BaseChunkForwardInde
private final int _maxChunkSize;
// Thread local (reusable) byte[] to read bytes from data file.
- private final ThreadLocal<byte[]> _reusableBytes =
ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]);
+ private static ThreadLocal<byte[]> _reusableBytes =
ThreadLocal.withInitial(() -> new byte[0]);
public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType
valueType) {
super(dataBuffer, valueType, true);
@@ -84,7 +84,7 @@ public final class VarByteChunkSVForwardIndexReader extends
BaseChunkForwardInde
int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
int length = valueEndOffset - valueStartOffset;
- byte[] bytes = _reusableBytes.get();
+ byte[] bytes = getOrExpandByteArray();
chunkBuffer.position(valueStartOffset);
chunkBuffer.get(bytes, 0, length);
return new String(bytes, 0, length, UTF_8);
@@ -103,11 +103,20 @@ public final class VarByteChunkSVForwardIndexReader
extends BaseChunkForwardInde
long valueEndOffset = getValueEndOffset(chunkId, chunkRowId,
chunkStartOffset);
int length = (int) (valueEndOffset - valueStartOffset);
- byte[] bytes = _reusableBytes.get();
+ byte[] bytes = getOrExpandByteArray();
_dataBuffer.copyTo(valueStartOffset, bytes, 0, length);
return new String(bytes, 0, length, UTF_8);
}
+ private byte[] getOrExpandByteArray() {
+ byte[] bytes = _reusableBytes.get();
+ if (bytes.length < _lengthOfLongestEntry) {
+ _reusableBytes.set(new byte[_lengthOfLongestEntry]);
+ bytes = _reusableBytes.get();
+ }
+ return bytes;
+ }
+
@Override
public byte[] getBytes(int docId, ChunkReaderContext context) {
if (_isCompressed) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]