Don't KeySelectors also need to be deterministic?
* The {@link KeySelector} allows to use deterministic objects for
operations such as reduce, * reduceGroup, join, coGroup, etc. *If
invoked multiple times on the same object, the returned key***** must be
the same.*
On 04/02/2022 18:25, John Smith wrote:
Hi Francesco, here is the reproducer:
https://github.com/javadevmtl/flink-key-reproducer
So, essentially it looks like when there's a high influx of records
produced from the source that the Exception is thrown.
The key is generated by 3 values: date/time rounded to the minute and
2 strings.
So you will see keys as follows...
2022-02-04T17:20:00Z|foo|bar
2022-02-04T17:21:00Z|foo|bar
2022-02-04T17:22:00Z|foo|bar
The reproducer has a custom source that basically produces a record in
a loop and sleeps for a specified period of milliseconds 100ms in this
case.
The lower the sleep delay the faster records are produced the more
chances the exception is thrown. With a 100ms delay it's always
thrown. Setting a 2000 to 3000ms will guarantee it to work.
The original job uses a Kafka Source so it should technically be able
to handle even a couple thousand records per second.
On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> wrote:
Ok it's not my data either. I think it may be a volume issue. I
have managed to consistently reproduce the error. I'll upload a
reproducer ASAP.
On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com>
wrote:
Ok so I tried to create a reproducer but I couldn't reproduce
it. But the actual job once in a while throws that error. So
I'm wondering if maybe one of the records that comes in is not
valid, though I do validate prior to getting to the key and
window operators.
On Thu, 3 Feb 2022 at 14:32, John Smith
<java.dev....@gmail.com> wrote:
Actually maybe not because with PrintSinkFunction it ran
for a bit and then it threw the error.
On Thu, 3 Feb 2022 at 14:24, John Smith
<java.dev....@gmail.com> wrote:
Ok it may be the ElasticSearch connector causing the
issue?
If I use PrintSinkFunction then I get no error and my
stats print as expected.
On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani
<france...@ververica.com> wrote:
Hi,
your hash code and equals seems correct. Can you
post a minimum stream pipeline reproducer using
this class?
FG
On Tue, Feb 1, 2022 at 8:39 PM John Smith
<java.dev....@gmail.com> wrote:
Hi, getting
java.lang.IllegalArgumentException: Key group
39 is not in KeyGroupRange{startKeyGroup=96,
endKeyGroup=103}. Unless you're directly using
low level state access APIs, this is most
likely caused by non-deterministic shuffle key
(hashCode and equals implementation).
This is my class, is my hashCode deterministic?
public final class MyEventCountKey {
private final StringcountDateTime; private final
Stringdomain; private final Stringevent; public MyEventCountKey(final String
countDateTime, final String domain, final String event) {
this.countDateTime = countDateTime;
this.domain = domain; this.event = event; }
public StringgetCountDateTime() {
return countDateTime; }
public StringgetDomain() {
return domain; }
public StringgetEven() {
return event; }
@Override public StringtoString() {
return countDateTime +"|" +domain +"|" +event;
}
@Override public boolean equals(Object o) {
if (this == o)return true; if (o ==null || getClass()
!= o.getClass())return false; MyEventCountKey that = (MyEventCountKey) o; return
countDateTime.equals(that.countDateTime) &&
domain.equals(that.domain) &&
event.equals(that.event); }
@Override public int hashCode() {
final int prime =31; int result =1; result =
prime * result +countDateTime.hashCode(); result = prime * result
+domain.hashCode(); result = prime * result +event.hashCode(); return result; }
}