Hi Francesco,  here is the reproducer:
https://github.com/javadevmtl/flink-key-reproducer

So, essentially it looks like when there's a high influx of records
produced from the source that the Exception is thrown.

The key is generated by 3 values: date/time rounded to the minute and 2
strings.
So you will see keys as follows...
2022-02-04T17:20:00Z|foo|bar
2022-02-04T17:21:00Z|foo|bar
2022-02-04T17:22:00Z|foo|bar

The reproducer has a custom source that basically produces a record in a
loop and sleeps for a specified period of milliseconds 100ms in this case.
The lower the sleep delay the faster records are produced the more chances
the exception is thrown. With a 100ms delay it's always thrown. Setting a
2000 to 3000ms will guarantee it to work.
The original job uses a Kafka Source so it should technically be able to
handle even a couple thousand records per second.


On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> wrote:

> Ok it's not my data either. I think it may be a volume issue. I have
> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>
>
>
> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> wrote:
>
>> Ok so I tried to create a reproducer but I couldn't reproduce it. But the
>> actual job once in a while throws that error. So I'm wondering if maybe one
>> of the records that comes in is not valid, though I do validate prior to
>> getting to the key and window operators.
>>
>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> wrote:
>>
>>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>>> then it threw the error.
>>>
>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com> wrote:
>>>
>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>
>>>> If I use PrintSinkFunction then I get no error and my stats print as
>>>> expected.
>>>>
>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>> france...@ververica.com> wrote:
>>>>
>>>>> Hi,
>>>>> your hash code and equals seems correct. Can you post a minimum stream
>>>>> pipeline reproducer using this class?
>>>>>
>>>>> FG
>>>>>
>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
>>>>>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
>>>>>> directly
>>>>>> using low level state access APIs, this is most likely caused by
>>>>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>>>>
>>>>>> This is my class, is my hashCode deterministic?
>>>>>>
>>>>>> public final class MyEventCountKey {
>>>>>>     private final String countDateTime;
>>>>>>     private final String domain;
>>>>>>     private final String event;
>>>>>>
>>>>>>     public MyEventCountKey(final String countDateTime, final String 
>>>>>> domain, final String event) {
>>>>>>         this.countDateTime = countDateTime;
>>>>>>         this.domain = domain;
>>>>>>         this.event = event;
>>>>>>     }
>>>>>>
>>>>>>     public String getCountDateTime() {
>>>>>>         return countDateTime;
>>>>>>     }
>>>>>>
>>>>>>     public String getDomain() {
>>>>>>         return domain;
>>>>>>     }
>>>>>>
>>>>>>     public String getEven() {
>>>>>>         return event;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public String toString() {
>>>>>>         return countDateTime + "|" + domain + "|" + event;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public boolean equals(Object o) {
>>>>>>         if (this == o) return true;
>>>>>>         if (o == null || getClass() != o.getClass()) return false;
>>>>>>         MyEventCountKey that = (MyEventCountKey) o;
>>>>>>         return countDateTime.equals(that.countDateTime) &&
>>>>>>                 domain.equals(that.domain) &&
>>>>>>                 event.equals(that.event);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public int hashCode() {
>>>>>>         final int prime = 31;
>>>>>>         int result = 1;
>>>>>>         result = prime * result + countDateTime.hashCode();
>>>>>>         result = prime * result + domain.hashCode();
>>>>>>         result = prime * result +  event.hashCode();
>>>>>>         return result;
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>

Reply via email to