Hi, hope someone on kafka-streams team can help.  Our application uses

KeyValueIterator it = KeyValueStore.all();

…..
it.remove()


This used to work but is now broken, causes our punctuate to fail and 
StreamThread to die.  The cause seems to be that there were changes in 0.10.2.0 
to InMemoryKeyValueStoreSupplier:



public synchronized KeyValueIterator<K, V> all() {
    final TreeMap<K, V> copy = new TreeMap<>(this.map);
    return new MemoryStoreIterator<>(copy.entrySet().iterator());
}

@Override
public synchronized KeyValueIterator<K, V> all() {
    final TreeMap<K, V> copy = new TreeMap<>(this.map);
    return new DelegatingPeekingKeyValueIterator<>(name, new 
MemoryStoreIterator<>(copy.entrySet().iterator()));
}
But the DelegatingPeekingKeyValueIterator has:

@Override
public void remove() {
    throw new UnsupportedOperationException("remove not supported");
}
whereas the old direct call on MemoryStoreIterator allowed remove.  For some 
reason there is no call to underlying.remove() in the 
DelegatingPeekingKeyValueIterator. 

We don’t want to downgrade to 0.10.1.1 as there was a useful bug fix and 
removing dependancy on zookeeper.

Thanks,
Tom 

Reply via email to