Hi Francesco, here is the reproducer: https://github.com/javadevmtl/flink-key-reproducer
So, essentially it looks like when there's a high influx of records produced from the source that the Exception is thrown. The key is generated by 3 values: date/time rounded to the minute and 2 strings. So you will see keys as follows... 2022-02-04T17:20:00Z|foo|bar 2022-02-04T17:21:00Z|foo|bar 2022-02-04T17:22:00Z|foo|bar The reproducer has a custom source that basically produces a record in a loop and sleeps for a specified period of milliseconds 100ms in this case. The lower the sleep delay the faster records are produced the more chances the exception is thrown. With a 100ms delay it's always thrown. Setting a 2000 to 3000ms will guarantee it to work. The original job uses a Kafka Source so it should technically be able to handle even a couple thousand records per second. On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> wrote: > Ok it's not my data either. I think it may be a volume issue. I have > managed to consistently reproduce the error. I'll upload a reproducer ASAP. > > > > On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> wrote: > >> Ok so I tried to create a reproducer but I couldn't reproduce it. But the >> actual job once in a while throws that error. So I'm wondering if maybe one >> of the records that comes in is not valid, though I do validate prior to >> getting to the key and window operators. >> >> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> wrote: >> >>> Actually maybe not because with PrintSinkFunction it ran for a bit and >>> then it threw the error. >>> >>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com> wrote: >>> >>>> Ok it may be the ElasticSearch connector causing the issue? >>>> >>>> If I use PrintSinkFunction then I get no error and my stats print as >>>> expected. >>>> >>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>> france...@ververica.com> wrote: >>>> >>>>> Hi, >>>>> your hash code and equals seems correct. Can you post a minimum stream >>>>> pipeline reproducer using this class? >>>>> >>>>> FG >>>>> >>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not >>>>>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're >>>>>> directly >>>>>> using low level state access APIs, this is most likely caused by >>>>>> non-deterministic shuffle key (hashCode and equals implementation). >>>>>> >>>>>> This is my class, is my hashCode deterministic? >>>>>> >>>>>> public final class MyEventCountKey { >>>>>> private final String countDateTime; >>>>>> private final String domain; >>>>>> private final String event; >>>>>> >>>>>> public MyEventCountKey(final String countDateTime, final String >>>>>> domain, final String event) { >>>>>> this.countDateTime = countDateTime; >>>>>> this.domain = domain; >>>>>> this.event = event; >>>>>> } >>>>>> >>>>>> public String getCountDateTime() { >>>>>> return countDateTime; >>>>>> } >>>>>> >>>>>> public String getDomain() { >>>>>> return domain; >>>>>> } >>>>>> >>>>>> public String getEven() { >>>>>> return event; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public String toString() { >>>>>> return countDateTime + "|" + domain + "|" + event; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public boolean equals(Object o) { >>>>>> if (this == o) return true; >>>>>> if (o == null || getClass() != o.getClass()) return false; >>>>>> MyEventCountKey that = (MyEventCountKey) o; >>>>>> return countDateTime.equals(that.countDateTime) && >>>>>> domain.equals(that.domain) && >>>>>> event.equals(that.event); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public int hashCode() { >>>>>> final int prime = 31; >>>>>> int result = 1; >>>>>> result = prime * result + countDateTime.hashCode(); >>>>>> result = prime * result + domain.hashCode(); >>>>>> result = prime * result + event.hashCode(); >>>>>> return result; >>>>>> } >>>>>> } >>>>>> >>>>>>