Cemalettin Koç created KAFKA-6713:
-------------------------------------
Summary: Provide an easy way replace store with a custom one on
High-Level Streams DSL
Key: KAFKA-6713
URL: https://issues.apache.org/jira/browse/KAFKA-6713
Project: Kafka
Issue Type: Improvement
Components: streams
Affects Versions: 1.0.1
Reporter: Cemalettin Koç
I am trying to use GlobalKTable with a custom store implementation. In my
stores, I would like to store my `Category` entites and I would like to query
them by their name as well. My custom store has some capabilities beyond `get`
such as get by `name`. I also want to get all entries in a hierarchical way in
a lazy fashion. I have other use cases as well.
In order to accomplish my task I had to implement a custom
`KeyValueBytesStoreSupplier`, `BytesTypeConverter` and
{code:java}
public class DelegatingByteStore<K, V> implements KeyValueStore<Bytes, byte[]> {
private BytesTypeConverter<K, V> converter;
private KeyValueStore<K, V> delegated;
public DelegatingByteStore(KeyValueStore<K, V> delegated,
BytesTypeConverter<K, V> converter) {
this.converter = converter;
this.delegated = delegated;
}
@Override
public void put(Bytes key, byte[] value) {
delegated.put(converter.outerKey(key),
converter.outerValue(value));
}
@Override
public byte[] putIfAbsent(Bytes key, byte[] value) {
V v = delegated.putIfAbsent(converter.outerKey(key),
converter.outerValue(value));
return v == null ? null : value;
}
......
{code}
Type Converter:
{code:java}
public interface TypeConverter<K, IK, V, IV> {
IK innerKey(final K key);
IV innerValue(final V value);
List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from);
List<KeyValue<K, V>> outerEntries(final List<KeyValue<Bytes, byte[]>> from);
V outerValue(final IV value);
KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from);
KeyValue<Bytes, byte[]>innerKeyValue(final KeyValue<K, V> entry);
K outerKey(final IK ik);
}
{code}
This is unfortunately too cumbersome and hard to maintain.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)