Re: [VOTE] 2.3.0 RC3

2019-06-24 Thread Mickael Maison
Thanks Colin for making a new RC for KAFKA-8564.
+1 (non binding)
I checked signatures and ran quickstart on the 2.12 binary

On Mon, Jun 24, 2019 at 6:03 AM Gwen Shapira  wrote:
>
> +1 (binding)
> Verified signatures, verified good build on jenkins, built from
> sources anyway and ran quickstart on the 2.11 binary.
>
> Looks good!
>
> On Sun, Jun 23, 2019 at 3:06 PM Jakub Scholz  wrote:
> >
> > +1 (non-binding). I used the binaries and run some of my tests against them.
> >
> > On Thu, Jun 20, 2019 at 12:03 AM Colin McCabe  wrote:
> >
> > > Hi all,
> > >
> > > We discovered some problems with the second release candidate (RC2) of
> > > 2.3.0.  Specifically, KAFKA-8564.  I've created a new RC which includes 
> > > the
> > > fix for this issue.
> > >
> > > Check out the release notes for the 2.3.0 release here:
> > > https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/RELEASE_NOTES.html
> > >
> > > The vote will go until Saturday, June 22nd, or until we create another RC.
> > >
> > > * Kafka's KEYS file containing PGP keys we use to sign the release can be
> > > found here:
> > > https://kafka.apache.org/KEYS
> > >
> > > * The release artifacts to be voted upon (source and binary) are here:
> > > https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/
> > >
> > > * Maven artifacts to be voted upon:
> > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > >
> > > * Javadoc:
> > > https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/javadoc/
> > >
> > > * The tag to be voted upon (off the 2.3 branch) is the 2.3.0 tag:
> > > https://github.com/apache/kafka/releases/tag/2.3.0-rc3
> > >
> > > best,
> > > Colin
> > >
> > > C.
> > >
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog


replace usage of TimeWindows.until() from Kafka Streams 2.2

2019-06-24 Thread unicorn . banachi
Hi Kafka Streams user,

I have this usage of  Kafka Streams and it works well that sets retention time 
in KTable, both in the internal topics and RocksDB local states.

final KStream eventStream = builder
.stream("events",
Consumed.with(Serdes.Integer(), Serdes.String())

.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));

eventStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(200)).until(Duration.ofSeconds(3000).toMillis()))
.reduce((oldValue, newValue) -> newValue);

I saw until() is deprecated from 2.2. What would be the replacement of such 
usage? 
I checked the Materialized related document but cannot find any

Best,

Sendoh


RE: Repeating UNKNOWN_PRODUCER_ID errors for Kafka streams applications

2019-06-24 Thread Pieter Hameete
Hi Guozhang,

An update on this. I've tested your hotfix as follows:

* Branched from apache/kafka:2.2 and applied your hotfix to it. I couldn’t 
build your forked repo, but the official repo was working. Version 2.4 was 
resulting in test errors, so I branched off 2.2 which is the official version 
we are using as well.

* I've changed the probability to purge on commit from 10% to 1% because the 
commit interval is 100ms, and we produce records to a partition roughly every 5 
seconds. This means there are 50 commits happening between every produced 
record. With 10% chance to purge on every commit there would be a ~99.5% chance 
that a purge took place between produced records. With the purge probably on 
commit set to 1% this is ~40%.

* Observed UNKNOWN_PRODUCER_ID errors observed over a 10 minute period reduced 
from 576 to 258 after applying the hotfix. So ~45% of what was produced before 
applying the hotfix, which is as expected.

Now I'm wondering how we can move forward with this. Purging repartition topics 
on every commit (every 100ms by default for EOS) seems unnecessary if the 
record produce rate is low (i.e. not a record produced for every commit). Is it 
possible to optionally disable this repartition topic purging behavior, or to 
link the purging rate to the rate at which records are produced?

Best,

Pieter

-Oorspronkelijk bericht-
Van: Guozhang Wang  
Verzonden: Monday, 17 June 2019 22:14
Aan: users@kafka.apache.org
Onderwerp: Re: Repeating UNKNOWN_PRODUCER_ID errors for Kafka streams 
applications

Hello Pieter,

Please feel free to apply this two-liner to your branch and see if it
helps: https://github.com/apache/kafka/pull/6951

Guozhang


On Thu, Jun 13, 2019 at 12:30 AM Pieter Hameete 
wrote:

> Hi Guozhang,
>
> We can run a shadow setup to test the workaround hotfix if that is 
> helpful for you. For the production cluster we'll wait for an official 
> artifact to be released.
>
> It is not super urgent for us because records are still being 
> produced/consumed. The main issue is many unnecessary error and warn 
> logs, and the unnecessary retries that follow from it. If there will 
> be a fix for this eventually that would make us very happy :-)
>
> Thanks for your efforts so far, and let me know if I can do anything 
> to assist. We could continue this work via Slack if you prefer.
>
> Best,
>
> Pieter
>
> -Oorspronkelijk bericht-
> Van: Guozhang Wang 
> Verzonden: Wednesday, 12 June 2019 20:30
> Aan: users@kafka.apache.org
> Onderwerp: Re: Repeating UNKNOWN_PRODUCER_ID errors for Kafka streams 
> applications
>
> @Jonathan Actually after double checking the code I realized
> https://issues.apache.org/jira/browse/KAFKA-7190 would not completely 
> resolve this issue, as explained in my previous email to Pieter.
>
> @Pieter Your concerns about this issue makes sense to me. Are you 
> willing to compile from source code than from a downloaded artifact? I 
> can try to provide you a quick workaround hotfix if you're willing to do the 
> former.
>
>
> Guozhang
>
> On Wed, Jun 12, 2019 at 4:13 AM Pieter Hameete < 
> pieter.hame...@blockbax.com>
> wrote:
>
> > Hey Jonathan,
> >
> > Thanks for the input. We've looked into the issue you linked but 
> > changing the topic config didn’t work for us (see earlier mails in 
> > this
> thread).
> > Could you elaborate a bit more on the specifics of your situation?
> >
> > How many partitions do you have for the repartition topics? What is 
> > your commit.interval.ms setting for the streams application? What is 
> > the frequency (volume) at which records are produced to each 
> > partition in your case?
> >
> > I'm trying to get a better idea of what 'low rate' and 'high volume' are.
> > Specifically in comparison to your topic and Kafka streams app 
> > configuration. We produce records to a partition every 5 seconds, 
> > and this results in errors which is unexpected to us because we run 
> > a EOS configuration with default settings and we don't regard 1 
> > record per 5 seconds per partition as super low volume with respect 
> > to these default settings.
> >
> > Best,
> >
> > Pieter
> >
> > -Oorspronkelijk bericht-
> > Van: Jonathan Santilli 
> > Verzonden: Tuesday, 11 June 2019 14:14
> > Aan: Kafka Users 
> > Onderwerp: Re: Repeating UNKNOWN_PRODUCER_ID errors for Kafka 
> > streams applications
> >
> > Hello,
> >
> > we currently see the same WARN logs when the App is processing low 
> > rate of records, as soon the app starts processing a high volume of 
> > records, those WARN stop showing in the logs.
> > According to other email threads, this should be fixed with 
> > https://issues.apache.org/jira/browse/KAFKA-7190.
> >
> > We use version 2.2.1, with EOS
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> > On Tue, Jun 11, 2019 at 11:04 AM Pieter Hameete < 
> > pieter.hame...@blockbax.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks that clarifies a lot. The segment size or segment ms is not 
> > >

Re: Can kafka internal state be purged ?

2019-06-24 Thread John Roesler
Hey, this is a very apt question.

GroupByKey isn't a great example because it doesn't actually change
the key, so all the aggregation results are actually on records from
the same partition. But let's say you do a groupBy or a map (or any
operation that can change the key), followed by an aggregation. Now
it's possible that the aggregation would need to process records from
two different partitions. In such a case (key-changing operation
followed by a stateful operation), Streams actually round-trips the
data through an intermediate topic, called a repartition topic, before
the aggregation. This has the effect, similar to the "shuffle" phase
of map-reduce, of putting all the data into its *new* right partition,
so then the aggregation can still process each of its partitions
independently.

Regarding the latter statement, even though you only have one
instance, Streams _still_ processes each partition independently. The
"unit of work" responsible for processing a partition is called a
"task". So if you have 4 partitions, then your one instance actually
has 4 state stores, one for each task, where each task only gets
records from a single partition. The tasks can't see anything about
each other, not their state nor other metadata like their current
stream time. Otherwise, the results would depend on which tasks happen
to be co-located with which other tasks. So, having to send your
"purge" event to all partitions is a pain, but in the end, it buys you
a lot, as you can add another instance to your cluster at any time,
and Streams will scale up, and you'll know that the program is
executing exactly the same way the whole time.

-John

On Sat, Jun 22, 2019 at 4:37 PM Parthasarathy, Mohan  wrote:
>
> I can see the issue. But it raised other questions. Pardon my ignorance. Even 
> though partitions are processed independently, windows can be aggregating 
> state from records read from many partitions. Let us say there is a 
> groupByKey followed by aggregate. In this case how is the state reconciled 
> across all the application instances ? Is there a designated instance for a 
> particular key ?
>
> In my case, there was only one instance processing records from all 
> partitions and it is kind of odd that windows did not expire even though I 
> understand why now.
>
> Thanks
> Mohan
>
>
> On 6/21/19, 2:25 PM, "John Roesler"  wrote:
>
> No problem. It's definitely a subtlety. It occurs because each
> partition is processed completely independently of the others, so
> "stream time" is tracked per partition, and there's no way to look
> across at the other partitions to find out what stream time they have.
>
> In general, it's not a problem because you'd expect all partitions to
> receive updates over time, but if you're specifically trying to send
> events that cause stuff to get flushed from the buffers, it can mess
> with you. It's especially notable in tests. So, for most tests, I just
> configure the topics to have one partition.
>
> -John
>
> On Fri, Jun 21, 2019 at 3:56 PM Parthasarathy, Mohan  
> wrote:
> >
> > That change "In the same partition" must explain what we are seeing. 
> Unless you see one message per partition, all windows will not expire. That 
> is an interesting twist. Thanks for the correction ( I will go back and 
> confirm this.
> >
> > -mohan
> >
> >
> > On 6/21/19, 12:40 PM, "John Roesler"  wrote:
> >
> > Sure, the record cache attempts to save downstream operators from
> > unnecessary updates by also buffering for a short amount of time
> > before forwarding. It forwards results whenever the cache fills up 
> or
> > whenever there is a commit. If you're happy to wait at least "commit
> > interval" amount of time for updates, then you don't need to do
> > anything, but if you're on the edge of your seat, waiting for these
> > results, you can set cache.max.bytes.buffering to 0 to disable the
> > record cache entirely. Note that this would hurt throughput in
> > general, though.
> >
> > Just a slight modification:
> > * a new record with new timestamp > (all the previous timestamps +
> > grace period) will cause all the old windows *in the same partition*
> > to close
> > * yes, expiry of the window depends only on the event time
> >
> > Hope this helps!
> > -John
> >
> > On Thu, Jun 20, 2019 at 11:42 AM Parthasarathy, Mohan 
>  wrote:
> > >
> > > Could you tell me a little more about the delays about the record 
> caches and how I can disable it ?
> > >
> > >  If I could summarize my problem:
> > >
> > > -A new record with a new timestamp > all records sent before, I 
> expect *all* of the old windows to close
> > > -Expiry of the windows depends only on the event time and not on 
> the key
> > >
>  

Re: replace usage of TimeWindows.until() from Kafka Streams 2.2

2019-06-24 Thread John Roesler
Hey Sendoh,

I think you just overlooked the javadoc in your search, which says:

> @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)} or 
> directly configure the retention in a store supplier and use {@link 
> Materialized#as(WindowBytesStoreSupplier)}.

Sorry for the confusion,
-John

On Mon, Jun 24, 2019 at 5:05 AM unicorn.bana...@gmail.com
 wrote:
>
> Hi Kafka Streams user,
>
> I have this usage of  Kafka Streams and it works well that sets retention 
> time in KTable, both in the internal topics and RocksDB local states.
>
> final KStream eventStream = builder
> .stream("events",
> Consumed.with(Serdes.Integer(), Serdes.String())
> 
> .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
>
> eventStream.groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofSeconds(200)).until(Duration.ofSeconds(3000).toMillis()))
> .reduce((oldValue, newValue) -> newValue);
>
> I saw until() is deprecated from 2.2. What would be the replacement of such 
> usage?
> I checked the Materialized related document but cannot find any
>
> Best,
>
> Sendoh


Kafka 2.1.1 migration - JMX metrics problem

2019-06-24 Thread Jose Manuel Vega Monroy
Hi there,

Recently we rolling upgraded our internal brokers from version 0.11.0.1 to 
version 2.1.1 in different envs, with the next definition of JMX metrics for 
Introscope Wily:

JMX|kafka.server|Fetch:queue-size,\
JMX|kafka.server|Fetch|*:byte-rate,\
JMX|kafka.server|Fetch|*:throttle-time,\
JMX|kafka.server|Produce:queue-size,\
JMX|kafka.server|Produce|*:byte-rate,\
JMX|kafka.server|Produce|*:throttle-time,\

However, somehow even defining client.id and group.id in our consumer/producer 
app conf, looking like Wily is taken the apps metrics by the username we are 
defining in sasl.jaas.config for security protocol SASL_PLAINTEXT, instead of 
client.id like used to be before migration.

Any idea why this is happening?

Thanks

[https://www.williamhillplc.com/content/signature/WHlogo.gif?width=180]
[https://www.williamhillplc.com/content/signature/senet.gif?width=180]
Jose Manuel Vega Monroy
Java Developer / Software Developer Engineer in Test
Direct: +0035 0 2008038 (Ext. 8038)
Email: jose.mon...@williamhill.com
William Hill | 6/1 Waterport Place | Gibraltar | GX11 1AA





Kafka Consumers - keeping them open

2019-06-24 Thread Kevin Perera
Hello! I’m interested in trying to get my Kafka Consumer to keep eating 
records. However, after a short period of time, it stops incrementing. How do 
you usually get this to work? Below is a short configuration that I use for my 
KafkaConsumer. Any help would be greatly appreciated. 

hostname = InetAddress.getLocalHost().getHostName();
// configuration for how we consume records.
tweetsProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hostname + ":9092");
tweetsProps.put(ConsumerConfig.GROUP_ID_CONFIG, “groupID");
tweetsProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
tweetsProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
tweetsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
tweetsProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
tweetsProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);



Re: Kafka Consumers - keeping them open

2019-06-24 Thread Boyang Chen
Hey Kevin,

could you give more context on what it means for `keep eating records` and
`stops incrementing`? In a typical use case, you should call `poll()` in a
while loop, and if you stop seeing new records, it could be either your
consumer is not working correctly, or your input volume is not fed in fast
enough.

Boyang

On Mon, Jun 24, 2019 at 8:13 AM Kevin Perera  wrote:

> Hello! I’m interested in trying to get my Kafka Consumer to keep eating
> records. However, after a short period of time, it stops incrementing. How
> do you usually get this to work? Below is a short configuration that I use
> for my KafkaConsumer. Any help would be greatly appreciated.
>
> hostname = InetAddress.getLocalHost().getHostName();
> // configuration for how we consume records.
> tweetsProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hostname +
> ":9092");
> tweetsProps.put(ConsumerConfig.GROUP_ID_CONFIG, “groupID");
> tweetsProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
> tweetsProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
> tweetsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> tweetsProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class);
> tweetsProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class);
>
>


Consumer offsets with Kafka Mirror Maker

2019-06-24 Thread Mohit Kumar
Is there anyway to copy consumers offset while moving data from one cluster
to another cluster?


Assunto: Consumer offsets with Kafka Mirror Maker

2019-06-24 Thread Marcelo Barbosa
I think it is not possible if you're using MirrorMaker. Confluent Platform has 
a product called Replicator that do what you want. 
Cheers
Barbosa

Enviado do Yahoo Mail no Android 
 
  Em seg, 24 24e jun 24e 2019 às 12:34, Mohit 
Kumar escreveu:   Is there anyway to copy 
consumers offset while moving data from one cluster
to another cluster?
  


Re: Consumer offsets with Kafka Mirror Maker

2019-06-24 Thread Ryanne Dolan
Mohit, PR-6295 and KIP-382 introduce MirorrMaker 2.0 which was designed to
support this operation.

In a nutshell, MM2 maintains a sparse offset sync stream while replicating
records between clusters. The offset syncs are used to translate consumer
offsets for periodic cross-cluster checkpoints. Leveraging these
checkpoints, you can resume processing in a new cluster via
kafka-consumer-groups.sh --reset-offsets, and you are guaranteed to never
skip records.

The RemoteClusterUtils.translateOffsets() method in the PR is a good place
to start. It will return the latest translated offsets for a particular
consumer group on a remote cluster. You can build tooling around this, e.g.
to migrate all your consumers at once.

https://github.com/apache/kafka/pull/6295
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0

Ryanne

On Mon, Jun 24, 2019 at 10:34 AM Mohit Kumar  wrote:

> Is there anyway to copy consumers offset while moving data from one cluster
> to another cluster?
>


[RESULT] [VOTE] 2.3.0 RC3

2019-06-24 Thread Colin McCabe
Hi all,

This vote passes with 6 +1 votes (3 of which are binding) and no 0 or -1 votes. 
 Thanks to everyone who voted.

+1 votes
PMC Members:
* Ismael Juma
* Guozhang Wang
* Gwen Shapira

Community:
* Kamal Chandraprakash
* Jakub Scholz
* Mickael Maison

0 votes
* No votes

-1 votes
* No votes

Vote thread:
https://www.mail-archive.com/dev@kafka.apache.org/msg98814.html

I'll continue with the release process and the release announcement will follow 
in the next few days.

thanks,
Colin


On Mon, Jun 24, 2019, at 01:17, Mickael Maison wrote:
> Thanks Colin for making a new RC for KAFKA-8564.
> +1 (non binding)
> I checked signatures and ran quickstart on the 2.12 binary
> 
> On Mon, Jun 24, 2019 at 6:03 AM Gwen Shapira  wrote:
> >
> > +1 (binding)
> > Verified signatures, verified good build on jenkins, built from
> > sources anyway and ran quickstart on the 2.11 binary.
> >
> > Looks good!
> >
> > On Sun, Jun 23, 2019 at 3:06 PM Jakub Scholz  wrote:
> > >
> > > +1 (non-binding). I used the binaries and run some of my tests against 
> > > them.
> > >
> > > On Thu, Jun 20, 2019 at 12:03 AM Colin McCabe  wrote:
> > >
> > > > Hi all,
> > > >
> > > > We discovered some problems with the second release candidate (RC2) of
> > > > 2.3.0.  Specifically, KAFKA-8564.  I've created a new RC which includes 
> > > > the
> > > > fix for this issue.
> > > >
> > > > Check out the release notes for the 2.3.0 release here:
> > > > https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/RELEASE_NOTES.html
> > > >
> > > > The vote will go until Saturday, June 22nd, or until we create another 
> > > > RC.
> > > >
> > > > * Kafka's KEYS file containing PGP keys we use to sign the release can 
> > > > be
> > > > found here:
> > > > https://kafka.apache.org/KEYS
> > > >
> > > > * The release artifacts to be voted upon (source and binary) are here:
> > > > https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/
> > > >
> > > > * Maven artifacts to be voted upon:
> > > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > >
> > > > * Javadoc:
> > > > https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/javadoc/
> > > >
> > > > * The tag to be voted upon (off the 2.3 branch) is the 2.3.0 tag:
> > > > https://github.com/apache/kafka/releases/tag/2.3.0-rc3
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > C.
> > > >
> >
> >
> >
> > --
> > Gwen Shapira
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
>


Re: Can kafka internal state be purged ?

2019-06-24 Thread Parthasarathy, Mohan
John,

Thanks for the nice explanation. When the repartitioning happens, does the 
window get associated with the new partition i.e., now does a message with new 
timestamp has to appear on the repartition topic for the window to expire ? It 
is possible that there is new stream of messages coming in but post-map 
operation, the partitions in the repartitioned topic does not see the same 
thing.

Thanks
Mohan

On 6/24/19, 7:49 AM, "John Roesler"  wrote:

Hey, this is a very apt question.

GroupByKey isn't a great example because it doesn't actually change
the key, so all the aggregation results are actually on records from
the same partition. But let's say you do a groupBy or a map (or any
operation that can change the key), followed by an aggregation. Now
it's possible that the aggregation would need to process records from
two different partitions. In such a case (key-changing operation
followed by a stateful operation), Streams actually round-trips the
data through an intermediate topic, called a repartition topic, before
the aggregation. This has the effect, similar to the "shuffle" phase
of map-reduce, of putting all the data into its *new* right partition,
so then the aggregation can still process each of its partitions
independently.

Regarding the latter statement, even though you only have one
instance, Streams _still_ processes each partition independently. The
"unit of work" responsible for processing a partition is called a
"task". So if you have 4 partitions, then your one instance actually
has 4 state stores, one for each task, where each task only gets
records from a single partition. The tasks can't see anything about
each other, not their state nor other metadata like their current
stream time. Otherwise, the results would depend on which tasks happen
to be co-located with which other tasks. So, having to send your
"purge" event to all partitions is a pain, but in the end, it buys you
a lot, as you can add another instance to your cluster at any time,
and Streams will scale up, and you'll know that the program is
executing exactly the same way the whole time.

-John

On Sat, Jun 22, 2019 at 4:37 PM Parthasarathy, Mohan  
wrote:
>
> I can see the issue. But it raised other questions. Pardon my ignorance. 
Even though partitions are processed independently, windows can be aggregating 
state from records read from many partitions. Let us say there is a groupByKey 
followed by aggregate. In this case how is the state reconciled across all the 
application instances ? Is there a designated instance for a particular key ?
>
> In my case, there was only one instance processing records from all 
partitions and it is kind of odd that windows did not expire even though I 
understand why now.
>
> Thanks
> Mohan
>
>
> On 6/21/19, 2:25 PM, "John Roesler"  wrote:
>
> No problem. It's definitely a subtlety. It occurs because each
> partition is processed completely independently of the others, so
> "stream time" is tracked per partition, and there's no way to look
> across at the other partitions to find out what stream time they have.
>
> In general, it's not a problem because you'd expect all partitions to
> receive updates over time, but if you're specifically trying to send
> events that cause stuff to get flushed from the buffers, it can mess
> with you. It's especially notable in tests. So, for most tests, I just
> configure the topics to have one partition.
>
> -John
>
> On Fri, Jun 21, 2019 at 3:56 PM Parthasarathy, Mohan 
 wrote:
> >
> > That change "In the same partition" must explain what we are 
seeing. Unless you see one message per partition, all windows will not expire. 
That is an interesting twist. Thanks for the correction ( I will go back and 
confirm this.
> >
> > -mohan
> >
> >
> > On 6/21/19, 12:40 PM, "John Roesler"  wrote:
> >
> > Sure, the record cache attempts to save downstream operators 
from
> > unnecessary updates by also buffering for a short amount of time
> > before forwarding. It forwards results whenever the cache fills 
up or
> > whenever there is a commit. If you're happy to wait at least 
"commit
> > interval" amount of time for updates, then you don't need to do
> > anything, but if you're on the edge of your seat, waiting for 
these
> > results, you can set cache.max.bytes.buffering to 0 to disable 
the
> > record cache entirely. Note that this would hurt throughput in
> > general, though.
> >
> > Just a slight modification:
> > * a new record with new timestamp > (all the previous 
timestamps +