I can see the issue. But it raised other questions. Pardon my ignorance. Even 
though partitions are processed independently, windows can be aggregating state 
from records read from many partitions. Let us say there is a groupByKey 
followed by aggregate. In this case how is the state reconciled across all the 
application instances ? Is there a designated instance for a particular key ?

In my case, there was only one instance processing records from all partitions 
and it is kind of odd that windows did not expire even though I understand why 
now.

Thanks
Mohan


On 6/21/19, 2:25 PM, "John Roesler" <j...@confluent.io> wrote:

    No problem. It's definitely a subtlety. It occurs because each
    partition is processed completely independently of the others, so
    "stream time" is tracked per partition, and there's no way to look
    across at the other partitions to find out what stream time they have.
    
    In general, it's not a problem because you'd expect all partitions to
    receive updates over time, but if you're specifically trying to send
    events that cause stuff to get flushed from the buffers, it can mess
    with you. It's especially notable in tests. So, for most tests, I just
    configure the topics to have one partition.
    
    -John
    
    On Fri, Jun 21, 2019 at 3:56 PM Parthasarathy, Mohan <mpart...@hpe.com> 
wrote:
    >
    > That change "In the same partition" must explain what we are seeing. 
Unless you see one message per partition, all windows will not expire. That is 
an interesting twist. Thanks for the correction ( I will go back and confirm 
this.
    >
    > -mohan
    >
    >
    > On 6/21/19, 12:40 PM, "John Roesler" <j...@confluent.io> wrote:
    >
    >     Sure, the record cache attempts to save downstream operators from
    >     unnecessary updates by also buffering for a short amount of time
    >     before forwarding. It forwards results whenever the cache fills up or
    >     whenever there is a commit. If you're happy to wait at least "commit
    >     interval" amount of time for updates, then you don't need to do
    >     anything, but if you're on the edge of your seat, waiting for these
    >     results, you can set cache.max.bytes.buffering to 0 to disable the
    >     record cache entirely. Note that this would hurt throughput in
    >     general, though.
    >
    >     Just a slight modification:
    >     * a new record with new timestamp > (all the previous timestamps +
    >     grace period) will cause all the old windows *in the same partition*
    >     to close
    >     * yes, expiry of the window depends only on the event time
    >
    >     Hope this helps!
    >     -John
    >
    >     On Thu, Jun 20, 2019 at 11:42 AM Parthasarathy, Mohan 
<mpart...@hpe.com> wrote:
    >     >
    >     > Could you tell me a little more about the delays about the record 
caches and how I can disable it ?
    >     >
    >     >  If I could summarize my problem:
    >     >
    >     > -A new record with a new timestamp > all records sent before, I 
expect *all* of the old windows to close
    >     > -Expiry of the windows depends only on the event time and not on 
the key
    >     >
    >     > Are these two statements correct ?
    >     >
    >     > Thanks
    >     > Mohan
    >     >
    >     > On 6/20/19, 9:17 AM, "John Roesler" <j...@confluent.io> wrote:
    >     >
    >     >     Hi!
    >     >
    >     >     In addition to setting the grace period to zero (or some small
    >     >     number), you should also consider the delays introduced by 
record
    >     >     caches upstream of the suppression. If you're closely watching 
the
    >     >     timing of records going into and coming out of the topology, 
this
    >     >     might also spoil your expectations. You could always disable the
    >     >     record cache to make the system more predictable (although this 
would
    >     >     hurt throughput in production).
    >     >
    >     >     Thanks,
    >     >     -John
    >     >
    >     >     On Wed, Jun 19, 2019 at 3:01 PM Parthasarathy, Mohan 
<mpart...@hpe.com> wrote:
    >     >     >
    >     >     > We do explicitly set the grace period to zero. I am going to 
try the new version
    >     >     >
    >     >     > -mohan
    >     >     >
    >     >     >
    >     >     > On 6/19/19, 12:50 PM, "Parthasarathy, Mohan" 
<mpart...@hpe.com> wrote:
    >     >     >
    >     >     >     Thanks. We will give it a shot.
    >     >     >
    >     >     >     On 6/19/19, 12:42 PM, "Bruno Cadonna" 
<br...@confluent.io> wrote:
    >     >     >
    >     >     >         Hi Mohan,
    >     >     >
    >     >     >         I realized that my previous statement was not clear. 
With a grace
    >     >     >         period of 12 hour, suppress would wait for late 
events until stream
    >     >     >         time has advanced 12 hours before a result would be 
emitted.
    >     >     >
    >     >     >         Best,
    >     >     >         Bruno
    >     >     >
    >     >     >         On Wed, Jun 19, 2019 at 9:21 PM Bruno Cadonna 
<br...@confluent.io> wrote:
    >     >     >         >
    >     >     >         > Hi Mohan,
    >     >     >         >
    >     >     >         > if you do not set a grace period, the grace period 
defaults to 12
    >     >     >         > hours. Hence, suppress would wait for an event that 
occurs 12 hour
    >     >     >         > later before it outputs a result. Try to explicitly 
set the grace
    >     >     >         > period to 0 and let us know if it worked.
    >     >     >         >
    >     >     >         > If it still does not work, upgrade to version 2.2.1 
if it is possible
    >     >     >         > for you. We had a couple of bugs in suppress 
recently that are fixed
    >     >     >         > in that version.
    >     >     >         >
    >     >     >         > Best,
    >     >     >         > Bruno
    >     >     >         >
    >     >     >         > On Wed, Jun 19, 2019 at 8:37 PM Parthasarathy, 
Mohan <mpart...@hpe.com> wrote:
    >     >     >         > >
    >     >     >         > > No, I have not set any grace period. Is that 
mandatory ? Have you seen problems with suppress and windows expiring ?
    >     >     >         > >
    >     >     >         > > Thanks
    >     >     >         > > Mohan
    >     >     >         > >
    >     >     >         > > On 6/19/19, 12:41 AM, "Bruno Cadonna" 
<br...@confluent.io> wrote:
    >     >     >         > >
    >     >     >         > >     Hi Mohan,
    >     >     >         > >
    >     >     >         > >     Did you set a grace period on the window?
    >     >     >         > >
    >     >     >         > >     Best,
    >     >     >         > >     Bruno
    >     >     >         > >
    >     >     >         > >     On Tue, Jun 18, 2019 at 2:04 AM 
Parthasarathy, Mohan <mpart...@hpe.com> wrote:
    >     >     >         > >     >
    >     >     >         > >     > On further debugging, what we are seeing is 
that windows are expiring rather randomly as new messages are being processed. 
. We tested with new key for every new message. We waited for the window time 
before replaying new messages. Sometimes a new message would come in and create 
state. It takes several messages to make some of the old windows to be closed 
(go past suppress to the next stage). We have also seen where one of them never 
closed even but several other older ones expired.  Then we explicitly sent a 
message with the same old key and then it showed up. Also, for every new 
message, only one of the previous window expires even though there are several 
pending.
    >     >     >         > >     >
    >     >     >         > >     > If we don't use suppress, then there is 
never an issue. With suppress, the behavior we are seeing is weird. We are 
using 2.1.0 version in DSL mode. Any clues on what we could be missing ? Why 
isn't there an order in the way windows are closed ? As event time progresses 
by the new messages arriving, the older ones should expire. Is that right 
understanding or not ?
    >     >     >         > >     >
    >     >     >         > >     > Thanks
    >     >     >         > >     > Mohan
    >     >     >         > >     >
    >     >     >         > >     > On 6/17/19, 3:43 PM, "Parthasarathy, Mohan" 
<mpart...@hpe.com> wrote:
    >     >     >         > >     >
    >     >     >         > >     >     Hi,
    >     >     >         > >     >
    >     >     >         > >     >     We are using suppress in the 
application. We see some state being created at some point in time. Now there 
is no new data for a day or two. We send new data but the old window of data 
(where we see the state being created) is not closing i.e not seeing it go 
through suppress and on to the next stage. It is as though the state created 
earlier was purged. Is this possible ?
    >     >     >         > >     >
    >     >     >         > >     >     Thanks
    >     >     >         > >     >     Mohan
    >     >     >         > >     >
    >     >     >         > >     >
    >     >     >         > >     >
    >     >     >         > >
    >     >     >         > >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >
    >     >
    >
    >
    

Reply via email to