Just as future reference, John Roesler confirmed me that the grace period
is global in each partition, so if the partition receives older data it is
discarded as the window is already closed.
I'll try to remove the grace period and use a custom transformer to act as
a a custom suppressor to make sure I emit a window only when I receive a
new one per each key.

--
Alessandro Tagliapietra

On Wed, Jul 17, 2019 at 9:25 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Seems that completely removing the grace period fixes the problem,
>
> is it expected? Is the grace period per key or global?
>
> --
> Alessandro Tagliapietra
>
> On Wed, Jul 17, 2019 at 12:07 AM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
>> I've added a reproduction repo here if someone wants to have a look at a
>> full working example
>>
>> https://github.com/alex88/kafka-error-repro
>>
>> you can see at the top of WindowTest.java the messages it sends and
>> underneath you have the part where it generates the sequence and the window
>>
>> I've also included the window only part
>> https://github.com/alex88/kafka-error-repro/blob/window_only/src/main/java/myapps/WindowTest.java
>>  since
>> this stil happens even with only a window.
>> The aggregate function is called for all records from a key but not all
>> the records for the second.
>>
>> --
>> Alessandro Tagliapietra
>>
>>
>> On Tue, Jul 16, 2019 at 10:36 PM Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>>
>>> Actually suppress doesn't matter, it happens later in the code, I've
>>> also tried to remove that and add a grace period to the window function but
>>> the issue persists.
>>>
>>> --
>>> Alessandro Tagliapietra
>>>
>>> On Tue, Jul 16, 2019 at 10:17 PM Alessandro Tagliapietra <
>>> tagliapietra.alessan...@gmail.com> wrote:
>>>
>>>> transformValuesHello everyone,
>>>>
>>>> I've an issue trying to window data. I'm sending on a topic the same
>>>> exact data for 2 different keys, however key number 1 is acting properly,
>>>> key number 2 isn't.
>>>>
>>>> As you can see here
>>>> https://gist.github.com/alex88/dd68a0ce4ae46c37edfc7492b6e16bc8#file-gistfile1-txt
>>>>  I'm
>>>> sending 4 messages for each key,
>>>> the first 3 messages of each key all have the values to 0, the 4th has
>>>> different values.
>>>>
>>>>
>>>> https://gist.github.com/alex88/dd68a0ce4ae46c37edfc7492b6e16bc8#file-code-java
>>>>  this
>>>> is the code I have (there are more things before and after but I've
>>>> narrowed down the issue here).
>>>>
>>>> https://gist.github.com/alex88/dd68a0ce4ae46c37edfc7492b6e16bc8#file-output-log
>>>>  this
>>>> is the output i see in the console.
>>>>
>>>> Basically what's happening is, in the first piece of code I just use a
>>>> store + transformValues to generate from a stream of data a stream of
>>>> pairs, so e.g. 1, 2, 3, 4 becomes (1,2), (2,3), (3,4) and so on.
>>>> After that stream I log all the sensor + values I get (in the foreach)
>>>> and out of the 8 total messages, which get flatMapped into 16 single
>>>> messages I send I see 16 logs, which means that all messages
>>>> that I've received went through that part.
>>>>
>>>> Now the problem comes after, first I use groupByKey to be able to then
>>>> window each key independently, then I use my aggregate function, it's as
>>>> this point that I don't see all the messages that I've expected to receive
>>>> from the previous step.
>>>> As you can see from the log key 1 has its log called 8 times (4
>>>> messages each with 2 metrics flatmapped to 8 messages), key 2 instead has
>>>> that log called only 3 times.
>>>> At first I thought maybe it was skipping some values because of the
>>>> suppress, but both keys have the same values so it should either suppress
>>>> both or none, plus all messages are ordered and the timestamp extractor
>>>> reads the timestamp from the message.
>>>>
>>>> Anyone has any idea on what could be the problem?
>>>>
>>>> --
>>>> Alessandro Tagliapietra
>>>>
>>>

Reply via email to