Any other ideas here? Should I create a bug?

On Tue, Jul 3, 2018 at 1:21 PM, Christian Henry <christian.henr...@gmail.com
> wrote:

> Nope, we're setting retainDuplicates to false.
>
> On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy <damian....@gmail.com> wrote:
>
>> Hi,
>>
>> When you create your window store do you have `retainDuplicates` set to
>> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is the
>> last param `true`?
>>
>> Thanks,
>> Damian
>>
>> On Mon, 2 Jul 2018 at 17:29 Christian Henry <christian.henr...@gmail.com>
>> wrote:
>>
>> > We're using the latest Kafka (1.1.0). I'd like to note that when we
>> > encounter duplicates, the window is the same as well.
>> >
>> > My original code was a bit simplifier -- we also insert into the store
>> if
>> > iterator.hasNext() as well, before returning null. We're using a window
>> > store because we have a punctuator that runs every few minutes to count
>> > GUIDs with similar metadata, and reports that in a healthcheck. Since
>> our
>> > healthcheck window is less than the retention period of the store
>> > (retention period might be 1 hour, healthcheck window is ~5 min), the
>> > window store seemed like a good way to efficiently query all of the most
>> > recent data. Note that since the healthcheck punctuator needs to
>> aggregate
>> > on all the recent values, it has to do a *fetchAll(start, end) *which is
>> > how these duplicates are affecting us.
>> >
>> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > > Hello Christian,
>> > >
>> > > Since you are calling fetch(key, start, end) I'm assuming that
>> > > duplicateStore
>> > > is a WindowedStore. With a windowed store, it is possible that a
>> single
>> > key
>> > > can fall into multiple windows, and hence be returned from the
>> > > WindowStoreIterator,
>> > > note its type is <Windowed<K>, V>
>> > >
>> > > So I'd first want to know
>> > >
>> > > 1) which Kafka version are you using.
>> > > 2) why you'd need a window store, and if yes, could you consider using
>> > the
>> > > single point fetch (added in KAFKA-6560) other than the range query
>> > (which
>> > > is more expensive as well).
>> > >
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
>> > > christian.henr...@gmail.com> wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > I'll first describe a simplified view of relevant parts of our setup
>> > > (which
>> > > > should be enough to repro), describe the behavior we're seeing, and
>> > then
>> > > > note some information I've come across after digging in a bit.
>> > > >
>> > > > We have a kafka stream application, and one of our transform steps
>> > keeps
>> > > a
>> > > > state store to filter out messages with a previously seen GUID. That
>> > is,
>> > > > our transform looks like:
>> > > >
>> > > > public KeyValue<byte[], String> transform(byte[] key, String guid) {
>> > > >     try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
>> > > > duplicateStore.fetch(correlationId, start, now)) {
>> > > >         if (iterator.hasNext()) {
>> > > >             return null;
>> > > >         } else {
>> > > >             duplicateStore.put(correlationId, some metadata);
>> > > >             return new KeyValue<>(key, message);
>> > > >         }
>> > > >     }}
>> > > >
>> > > > where the duplicateStore is a persistent windowed store with caching
>> > > > enabled.
>> > > >
>> > > > I was debugging some tests and found that sometimes when calling
>> > > > *all()* or *fetchAll()
>> > > > *on the duplicate store and stepping through the iterator, it would
>> > > return
>> > > > the same guid more than once, even if it was only inserted into the
>> > store
>> > > > once. More specifically, if I had the following guids sent to the
>> > stream:
>> > > > [11111, 22222, ... 99999] (for 9 values total), sometimes it would
>> > return
>> > > > 10 values, with one (or more) of the values being returned twice by
>> the
>> > > > iterator. However, this would not show up with a *fetch(guid)* on
>> that
>> > > > specific guid. For instance, if 11111 was being returned twice by
>> > > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)*
>> will
>> > > > still return an iterator with size of 1.
>> > > >
>> > > > I dug into this a bit more by setting a breakpoint in
>> > > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
>> > > > storeKey)* and watching the two input values as I looped through the
>> > > > iterator using "*while(iterator.hasNext()) { print(iterator.next())
>> > }*".
>> > > In
>> > > > one test, the duplicate value was 66666, and saw the following
>> behavior
>> > > > (trimming off the segment values from the byte input):
>> > > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
>> > > > -- next() returns 66666
>> > > > and
>> > > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
>> > > > -- next() returns 66666
>> > > > Besides those, the input values are the same and the output is as
>> > > expected.
>> > > > Additionally, a coworker noted that the number of duplicates always
>> > > matches
>> > > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
>> > > *returns
>> > > > a non-zero value, indicating that duplicates are likely arising due
>> to
>> > > the
>> > > > segment comparison.
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>
>

Reply via email to