[ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417959#comment-16417959 ]
Cemalettin Koç commented on KAFKA-6713: --------------------------------------- Hi [~guozhang], For 1 and 2) I would like to access my customized version of `CategoryInMemoryStore`. I will be glad if you write something simple for me. I could not follow at your comment. 3) My category topic is updated rarely. I need all data and I am using GlobalKTable and I am using some rendering at my pages. Since it is changing rarely, I would like to cache it but in case an update I would like to invalidate this cache. However currently I could not find a way to be notified in case a my GlobalKTable based InMemoryStore is updated. If a new Category added or changed in my InMemoryStore, I would like to trigger invalidation of my rendering cache. > Provide an easy way replace store with a custom one on High-Level Streams DSL > ----------------------------------------------------------------------------- > > Key: KAFKA-6713 > URL: https://issues.apache.org/jira/browse/KAFKA-6713 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.0.1 > Reporter: Cemalettin Koç > Priority: Major > Labels: streaming-api > Attachments: BytesTypeConverter.java, DelegatingByteStore.java, > TypeConverter.java > > > I am trying to use GlobalKTable with a custom store implementation. In my > stores, I would like to store my `Category` entites and I would like to query > them by their name as well. My custom store has some capabilities beyond > `get` such as get by `name`. I also want to get all entries in a hierarchical > way in a lazy fashion. I have other use cases as well. > > In order to accomplish my task I had to implement a custom > `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and > > {code:java} > public class DelegatingByteStore<K, V> implements KeyValueStore<Bytes, > byte[]> { > private BytesTypeConverter<K, V> converter; > private KeyValueStore<K, V> delegated; > public DelegatingByteStore(KeyValueStore<K, V> delegated, > BytesTypeConverter<K, V> converter) { > this.converter = converter; > this.delegated = delegated; > } > @Override > public void put(Bytes key, byte[] value) { > delegated.put(converter.outerKey(key), > converter.outerValue(value)); > } > @Override > public byte[] putIfAbsent(Bytes key, byte[] value) { > V v = delegated.putIfAbsent(converter.outerKey(key), > converter.outerValue(value)); > return v == null ? null : value; > } > ...... > {code} > > Type Converter: > {code:java} > public interface TypeConverter<K, IK, V, IV> { > IK innerKey(final K key); > IV innerValue(final V value); > List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from); > List<KeyValue<K, V>> outerEntries(final List<KeyValue<Bytes, byte[]>> from); > V outerValue(final IV value); > KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from); > KeyValue<Bytes, byte[]>innerKeyValue(final KeyValue<K, V> entry); > K outerKey(final IK ik); > } > {code} > > This is unfortunately too cumbersome and hard to maintain. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)