Seems that completely removing the grace period fixes the problem, is it expected? Is the grace period per key or global?
-- Alessandro Tagliapietra On Wed, Jul 17, 2019 at 12:07 AM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > I've added a reproduction repo here if someone wants to have a look at a > full working example > > https://github.com/alex88/kafka-error-repro > > you can see at the top of WindowTest.java the messages it sends and > underneath you have the part where it generates the sequence and the window > > I've also included the window only part > https://github.com/alex88/kafka-error-repro/blob/window_only/src/main/java/myapps/WindowTest.java > since > this stil happens even with only a window. > The aggregate function is called for all records from a key but not all > the records for the second. > > -- > Alessandro Tagliapietra > > > On Tue, Jul 16, 2019 at 10:36 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > >> Actually suppress doesn't matter, it happens later in the code, I've also >> tried to remove that and add a grace period to the window function but the >> issue persists. >> >> -- >> Alessandro Tagliapietra >> >> On Tue, Jul 16, 2019 at 10:17 PM Alessandro Tagliapietra < >> tagliapietra.alessan...@gmail.com> wrote: >> >>> transformValuesHello everyone, >>> >>> I've an issue trying to window data. I'm sending on a topic the same >>> exact data for 2 different keys, however key number 1 is acting properly, >>> key number 2 isn't. >>> >>> As you can see here >>> https://gist.github.com/alex88/dd68a0ce4ae46c37edfc7492b6e16bc8#file-gistfile1-txt >>> I'm >>> sending 4 messages for each key, >>> the first 3 messages of each key all have the values to 0, the 4th has >>> different values. >>> >>> >>> https://gist.github.com/alex88/dd68a0ce4ae46c37edfc7492b6e16bc8#file-code-java >>> this >>> is the code I have (there are more things before and after but I've >>> narrowed down the issue here). >>> >>> https://gist.github.com/alex88/dd68a0ce4ae46c37edfc7492b6e16bc8#file-output-log >>> this >>> is the output i see in the console. >>> >>> Basically what's happening is, in the first piece of code I just use a >>> store + transformValues to generate from a stream of data a stream of >>> pairs, so e.g. 1, 2, 3, 4 becomes (1,2), (2,3), (3,4) and so on. >>> After that stream I log all the sensor + values I get (in the foreach) >>> and out of the 8 total messages, which get flatMapped into 16 single >>> messages I send I see 16 logs, which means that all messages >>> that I've received went through that part. >>> >>> Now the problem comes after, first I use groupByKey to be able to then >>> window each key independently, then I use my aggregate function, it's as >>> this point that I don't see all the messages that I've expected to receive >>> from the previous step. >>> As you can see from the log key 1 has its log called 8 times (4 messages >>> each with 2 metrics flatmapped to 8 messages), key 2 instead has that log >>> called only 3 times. >>> At first I thought maybe it was skipping some values because of the >>> suppress, but both keys have the same values so it should either suppress >>> both or none, plus all messages are ordered and the timestamp extractor >>> reads the timestamp from the message. >>> >>> Anyone has any idea on what could be the problem? >>> >>> -- >>> Alessandro Tagliapietra >>> >>