GitHub user rhauch opened a pull request:
https://github.com/apache/kafka/pull/256
KAFKA-2594 Added InMemoryLRUCacheStore
Added a new `KeyValueStore` implementation called `InMemoryLRUCacheStore`
that keeps a maximum number of entries in-memory, and as the size exceeds the
capacity the least-recently used entry is removed from the store and the
backing topic. Also added unit tests for this new store and the existing
`InMemoryKeyValueStore` and `RocksDBKeyValueStore` implementations. A new
`KeyValueStoreTestDriver` class simplifies all of the other tests, and can be
used by other libraries to help test their own custom implementations.
This PR depends upon
[KAFKA-2593](https://issues.apache.org/jira/browse/KAFKA-2593) and its PR at
https://github.com/apache/kafka/pull/255. Once that PR is merged, I can rebase
this PR if desired.
Two issues were uncovered when creating these new unit tests, and both are
also addressed as separate (small) commits in this PR:
* The `RocksDBKeyValueStore` initialization was not creating the file
system directory if missing.
* `MeteredKeyValueStore` was casting to `ProcessorContextImpl` to access
the `RecordCollector`, which prevent using `MeteredKeyValueStore`
implementations in tests where something other than `ProcessorContextImpl` was
used. The fix was to introduce a `RecordCollector.Supplier` interface to define
this `recordCollector()` method, and change `ProcessorContextImpl` and
`MockProcessorContext` to both implement this interface. Now,
`MeteredKeyValueStore` can cast to the new interface to access the record
collector rather than to a single concrete implementation, making it possible
to use any and all current stores inside unit tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rhauch/kafka kafka-2594
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/256.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #256
----
commit d7e25f55dcbac1b64cfb0a8f642ce55078bd7d03
Author: Randall Hauch <[email protected]>
Date: 2015-09-28T18:34:51Z
KAFKA-2593 Key value stores can use custom serializers and deserializers
Add support for the key value stores to use specified serializers and
deserializers (aka, "serdes"). Prior to this change, the stores were limited to
only the default serdes specified in the topology's configuration and exposed
to the processors via the ProcessorContext.
Now, using InMemoryKeyValueStore and RocksDBKeyValueStore are similar: both
are parameterized on the key and value types, and both have similar multiple
static factory methods. The static factory methods either take explicit key and
value serdes, take key and value class types so the serdes can be inferred
(only for the built-in serdes for string, integer, long, and byte array types),
or use the default serdes on the ProcessorContext.
commit 3b903d31503fcce7cf2b9607b68893a02425dacf
Author: Randall Hauch <[email protected]>
Date: 2015-09-28T19:11:55Z
KAFKA-2594: Corrected RocksDBKeyValueStore to properly create directory if
missing
commit 33e4fd2b8bb56783d62f55e223757e8c479212ed
Author: Randall Hauch <[email protected]>
Date: 2015-09-28T19:15:24Z
KAFKA-2594: MeteredKeyValueStore no longer casts to ProcessorContextImpl
The cast was used to access ProcessorContextImpl.recordCollector(), and the
cast prevent using MeteredKeyValueStore implementations in tests where
something other than ProcessorContextImpl was used. Introduced a
RecordCollector.Supplier interface to define this `recordCollector()` method,
and changed ProcessorContextImpl and MockProcessorContext to both implement
this interface. Now, MeteredKeyValueStore can cast to the interface to access
the record collector rather than to a single concrete implementation, making it
possible to use the stores inside unit tests.
commit ff5e779fa1d6955bf0bf633191edd9d7a7cb9a9c
Author: Randall Hauch <[email protected]>
Date: 2015-09-28T19:26:16Z
KAFKA-2594 Added InMemoryLRUCacheStore
Added a new KeyValueStore implementation that keeps a maximum number of
entries in-memory, and as the size exceeds the capacity the least-recently used
entry is removed from the store and the backing topic. Also added unit tests
for this new store and the existing InMemoryKeyValueStore and
RocksDBKeyValueStore implementations. A new KeyValueStoreTestDriver class
simplifies all of the other tests.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---