Hi,

remove() should not be supported -- thus, it's actually a bug in 0.10.1
that got fixed in 0.10.2.

Stores should only be altered by Streams and iterator over the stores
should be read-only -- otherwise, you might mess up Streams internal state.

I would highly recommend to reconsider the call to it.remove() in you
application. Not sure what you try to accomplish, but you should do it
differently.


-Matthias


On 3/22/17 8:00 AM, Tom Dearman wrote:
> Hi, hope someone on kafka-streams team can help.  Our application uses
> 
> KeyValueIterator it = KeyValueStore.all();
> 
> …..
> it.remove()
> 
> 
> This used to work but is now broken, causes our punctuate to fail and 
> StreamThread to die.  The cause seems to be that there were changes in 
> 0.10.2.0 to InMemoryKeyValueStoreSupplier:
> 
> 
> 
> public synchronized KeyValueIterator<K, V> all() {
>     final TreeMap<K, V> copy = new TreeMap<>(this.map);
>     return new MemoryStoreIterator<>(copy.entrySet().iterator());
> }
> 
> @Override
> public synchronized KeyValueIterator<K, V> all() {
>     final TreeMap<K, V> copy = new TreeMap<>(this.map);
>     return new DelegatingPeekingKeyValueIterator<>(name, new 
> MemoryStoreIterator<>(copy.entrySet().iterator()));
> }
> But the DelegatingPeekingKeyValueIterator has:
> 
> @Override
> public void remove() {
>     throw new UnsupportedOperationException("remove not supported");
> }
> whereas the old direct call on MemoryStoreIterator allowed remove.  For some 
> reason there is no call to underlying.remove() in the 
> DelegatingPeekingKeyValueIterator. 
> 
> We don’t want to downgrade to 0.10.1.1 as there was a useful bug fix and 
> removing dependancy on zookeeper.
> 
> Thanks,
> Tom 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to