Cemalettin Koç created KAFKA-6713: ------------------------------------- Summary: Provide an easy way replace store with a custom one on High-Level Streams DSL Key: KAFKA-6713 URL: https://issues.apache.org/jira/browse/KAFKA-6713 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.0.1 Reporter: Cemalettin Koç
I am trying to use GlobalKTable with a custom store implementation. In my stores, I would like to store my `Category` entites and I would like to query them by their name as well. My custom store has some capabilities beyond `get` such as get by `name`. I also want to get all entries in a hierarchical way in a lazy fashion. I have other use cases as well. In order to accomplish my task I had to implement a custom `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and {code:java} public class DelegatingByteStore<K, V> implements KeyValueStore<Bytes, byte[]> { private BytesTypeConverter<K, V> converter; private KeyValueStore<K, V> delegated; public DelegatingByteStore(KeyValueStore<K, V> delegated, BytesTypeConverter<K, V> converter) { this.converter = converter; this.delegated = delegated; } @Override public void put(Bytes key, byte[] value) { delegated.put(converter.outerKey(key), converter.outerValue(value)); } @Override public byte[] putIfAbsent(Bytes key, byte[] value) { V v = delegated.putIfAbsent(converter.outerKey(key), converter.outerValue(value)); return v == null ? null : value; } ...... {code} Type Converter: {code:java} public interface TypeConverter<K, IK, V, IV> { IK innerKey(final K key); IV innerValue(final V value); List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from); List<KeyValue<K, V>> outerEntries(final List<KeyValue<Bytes, byte[]>> from); V outerValue(final IV value); KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from); KeyValue<Bytes, byte[]>innerKeyValue(final KeyValue<K, V> entry); K outerKey(final IK ik); } {code} This is unfortunately too cumbersome and hard to maintain. -- This message was sent by Atlassian JIRA (v7.6.3#76005)