[ 
https://issues.apache.org/jira/browse/KAFKA-13286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13286:
----------------------------------
    Description: 
Kafka Streams state store is built in hierarchical layers as metered -> cached 
-> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on 
the builtin Serde libraries for serialize / deserialize. There are several 
inefficiencies in the current design:

* The API only supports serde using byte arrays. This means we generate a lot 
of garbage and spend unnecessary time copying bytes, especially when working 
with windowed state stores that rely on composite keys. In many places in the 
code we have extract parts of the composite key to deserialize the either the 
timestamp or the message key from the state store key (e.g. the methods in 
WindowStoreUtils).
* The serde operation could happen on multiple layers of the state store 
hierarchies, which means we need to extra byte array copies as we move along 
doing serdes. For example, we do serde in the metered layer, but then again in 
cached layer with cache functions, and also in logged stores for generated the 
key/value in bytes to send to Kafka.

To improve on this, we can consider having support for serde into/from 
ByteBuffers would allow us to reuse the underlying bytearrays and just pass 
around slices of the underlying Buffers to avoid the unnecessary copying. 

1) More specifically, e.g. the serialize interface could be refactored to:

{code}
ByteBuffer serialize(String topic, T data, ByteBuffer);
{code}

Where the serialized bytes would be appended to the ByteBuffer. When a series 
of serialize functions are called along side the state store hierarchies, we 
then just need to make sure that what's should be appended first to the 
ByteBuffer would be serialized first. E.g. if the serialized bytes format of a 
WindowSchema is <timestamp, boolean, key>

Then we would need to call the serialize as in:

{code}
serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
{code}

2) In addition, we can consider having a pool of ByteBuffers representing a set 
of byte arrays that can be re-used. This can be captured as an intelligent 
{{ByteBufferSupplier}}, which provides:

{code}
ByteBuffer ByteBufferSupplier#allocate(long size)
{code}

Its implementation can choose to either create new byte arrays, or re-use 
existing ones in the pool; the gottcha though is that we may usually not know 
the serialized byte length for raw keys (think: in practice the keys would be 
in json/avro etc), and hence would not know how to pass in {{size}} for 
serialization, and hence may need to be conservative, or trial and error etc.

Of course callers then would be responsible for returning the used ByteBuffer 
back to the Supplier via

{code}
ByteBufferSupplier#deallocate(ByteBuffer buffer)
{code}

Some quick notes here regarding concurrency and sharing of the byte-buffer 
pools:

* For pull query handling threads, if we do not do any deserialization then we 
would not need to access the ByteBufferSuppliers, hence there's no concurrent 
access.
* For multiple streaming threads, my intention is to have each thread getting 
its own isolated byte-buffer pools to avoid any concurrency.

3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also 
allocate them from RocksDB directly so that using them for puts/gets would not 
go through JNI, hence is more efficient. The Supplier then would need to be 
careful to deallocate these direct byte-buffers since they would not be GC'ed 
by the JVM.


  was:
Kafka Streams state store is built in hierarchical layers as metered -> cached 
-> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on 
the builtin Serde libraries for serialize / deserialize. There are several 
inefficiencies in the current design:

* The API only supports serde using byte arrays. This means we generate a lot 
of garbage and spend unnecessary time copying bytes, especially when working 
with windowed state stores that rely on composite keys. In many places in the 
code we have extract parts of the composite key to deserialize the either the 
timestamp or the message key from the state store key (e.g. the methods in 
WindowStoreUtils).
* The serde operation could happen on multiple layers of the state store 
hierarchies, which means we need to extra byte array copies as we move along 
doing serdes. For example, we do serde in the metered layer, but then again in 
cached layer with cache functions, and also in logged stores for generated the 
key/value in bytes to send to Kafka.

To improve on this, we can consider having support for serde into/from 
ByteBuffers would allow us to reuse the underlying bytearrays and just pass 
around slices of the underlying Buffers to avoid the unnecessary copying. 

1) More specifically, e.g. the serialize interface could be refactored to:

{code}
ByteBuffer serialize(String topic, T data, ByteBuffer);
{code}

Where the serialized bytes would be appended to the ByteBuffer. When a series 
of serialize functions are called along side the state store hierarchies, we 
then just need to make sure that what's should be appended first to the 
ByteBuffer would be serialized first. E.g. if the serialized bytes format of a 
WindowSchema is <timestamp, boolean, key>

Then we would need to call the serialize as in:

{code}
serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
{code}

2) In addition, we can consider having a pool of ByteBuffers representing a set 
of byte arrays that can be re-used. This can be captured as an intelligent 
{{ByteBufferSupplier}}, which provides:

{code}
ByteBuffer ByteBufferSupplier#allocate(long size)
{code}

Its implementation can choose to either create new byte arrays, or re-use 
existing ones in the pool; the gottcha though is that we may usually not know 
the serialized byte length for raw keys (think: in practice the keys would be 
in json/avro etc), and hence would not know how to pass in {{size}} for 
serialization, and hence may need to be conservative, or trial and error etc.

Of course callers then would be responsible for returning the used ByteBuffer 
back to the Supplier via

{code}
ByteBufferSupplier#deallocate(ByteBuffer buffer)
{code}

3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also 
allocate them from RocksDB directly so that using them for puts/gets would not 
go through JNI, hence is more efficient. The Supplier then would need to be 
careful to deallocate these direct byte-buffers since they would not be GC'ed 
by the JVM.



> Revisit Streams State Store and Serde Implementation
> ----------------------------------------------------
>
>                 Key: KAFKA-13286
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13286
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>
> Kafka Streams state store is built in hierarchical layers as metered -> 
> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it 
> leveraged on the builtin Serde libraries for serialize / deserialize. There 
> are several inefficiencies in the current design:
> * The API only supports serde using byte arrays. This means we generate a lot 
> of garbage and spend unnecessary time copying bytes, especially when working 
> with windowed state stores that rely on composite keys. In many places in the 
> code we have extract parts of the composite key to deserialize the either the 
> timestamp or the message key from the state store key (e.g. the methods in 
> WindowStoreUtils).
> * The serde operation could happen on multiple layers of the state store 
> hierarchies, which means we need to extra byte array copies as we move along 
> doing serdes. For example, we do serde in the metered layer, but then again 
> in cached layer with cache functions, and also in logged stores for generated 
> the key/value in bytes to send to Kafka.
> To improve on this, we can consider having support for serde into/from 
> ByteBuffers would allow us to reuse the underlying bytearrays and just pass 
> around slices of the underlying Buffers to avoid the unnecessary copying. 
> 1) More specifically, e.g. the serialize interface could be refactored to:
> {code}
> ByteBuffer serialize(String topic, T data, ByteBuffer);
> {code}
> Where the serialized bytes would be appended to the ByteBuffer. When a series 
> of serialize functions are called along side the state store hierarchies, we 
> then just need to make sure that what's should be appended first to the 
> ByteBuffer would be serialized first. E.g. if the serialized bytes format of 
> a WindowSchema is <timestamp, boolean, key>
> Then we would need to call the serialize as in:
> {code}
> serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
> {code}
> 2) In addition, we can consider having a pool of ByteBuffers representing a 
> set of byte arrays that can be re-used. This can be captured as an 
> intelligent {{ByteBufferSupplier}}, which provides:
> {code}
> ByteBuffer ByteBufferSupplier#allocate(long size)
> {code}
> Its implementation can choose to either create new byte arrays, or re-use 
> existing ones in the pool; the gottcha though is that we may usually not know 
> the serialized byte length for raw keys (think: in practice the keys would be 
> in json/avro etc), and hence would not know how to pass in {{size}} for 
> serialization, and hence may need to be conservative, or trial and error etc.
> Of course callers then would be responsible for returning the used ByteBuffer 
> back to the Supplier via
> {code}
> ByteBufferSupplier#deallocate(ByteBuffer buffer)
> {code}
> Some quick notes here regarding concurrency and sharing of the byte-buffer 
> pools:
> * For pull query handling threads, if we do not do any deserialization then 
> we would not need to access the ByteBufferSuppliers, hence there's no 
> concurrent access.
> * For multiple streaming threads, my intention is to have each thread getting 
> its own isolated byte-buffer pools to avoid any concurrency.
> 3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also 
> allocate them from RocksDB directly so that using them for puts/gets would 
> not go through JNI, hence is more efficient. The Supplier then would need to 
> be careful to deallocate these direct byte-buffers since they would not be 
> GC'ed by the JVM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to