Don't KeySelectors also need to be deterministic?

* The {@link KeySelector} allows to use deterministic objects for operations such as reduce, * reduceGroup, join, coGroup, etc. *If invoked multiple times on the same object, the returned key***** must be the same.*


On 04/02/2022 18:25, John Smith wrote:
Hi Francesco,  here is the reproducer: https://github.com/javadevmtl/flink-key-reproducer

So, essentially it looks like when there's a high influx of records produced from the source that the Exception is thrown.

The key is generated by 3 values: date/time rounded to the minute and 2 strings.
So you will see keys as follows...
2022-02-04T17:20:00Z|foo|bar
2022-02-04T17:21:00Z|foo|bar
2022-02-04T17:22:00Z|foo|bar

The reproducer has a custom source that basically produces a record in a loop and sleeps for a specified period of milliseconds 100ms in this case. The lower the sleep delay the faster records are produced the more chances the exception is thrown. With a 100ms delay it's always thrown. Setting a 2000 to 3000ms will guarantee it to work. The original job uses a Kafka Source so it should technically be able to handle even a couple thousand records per second.


On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> wrote:

    Ok it's not my data either. I think it may be a volume issue. I
    have managed to consistently reproduce the error. I'll upload a
    reproducer ASAP.



    On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com>
    wrote:

        Ok so I tried to create a reproducer but I couldn't reproduce
        it. But the actual job once in a while throws that error. So
        I'm wondering if maybe one of the records that comes in is not
        valid, though I do validate prior to getting to the key and
        window operators.

        On Thu, 3 Feb 2022 at 14:32, John Smith
        <java.dev....@gmail.com> wrote:

            Actually maybe not because with PrintSinkFunction it ran
            for a bit and then it threw the error.

            On Thu, 3 Feb 2022 at 14:24, John Smith
            <java.dev....@gmail.com> wrote:

                Ok it may be the ElasticSearch connector causing the
                issue?

                If I use PrintSinkFunction then I get no error and my
                stats print as expected.

                On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani
                <france...@ververica.com> wrote:

                    Hi,
                    your hash code and equals seems correct. Can you
                    post a minimum stream pipeline reproducer using
                    this class?

                    FG

                    On Tue, Feb 1, 2022 at 8:39 PM John Smith
                    <java.dev....@gmail.com> wrote:

                        Hi, getting
                        java.lang.IllegalArgumentException: Key group
                        39 is not in KeyGroupRange{startKeyGroup=96,
                        endKeyGroup=103}. Unless you're directly using
                        low level state access APIs, this is most
                        likely caused by non-deterministic shuffle key
                        (hashCode and equals implementation).

                        This is my class, is my hashCode deterministic?

                        public final class MyEventCountKey {
                             private final StringcountDateTime; private final 
Stringdomain; private final Stringevent; public MyEventCountKey(final String 
countDateTime, final String domain, final String event) {
                                 this.countDateTime = countDateTime; 
this.domain = domain; this.event = event; }

                             public StringgetCountDateTime() {
                                 return countDateTime; }

                             public StringgetDomain() {
                                 return domain; }

                             public StringgetEven() {
                                 return event; }

                             @Override public StringtoString() {
                                 return countDateTime +"|" +domain +"|" +event; 
}

                             @Override public boolean equals(Object o) {
                                 if (this == o)return true; if (o ==null || getClass() 
!= o.getClass())return false; MyEventCountKey that = (MyEventCountKey) o; return 
countDateTime.equals(that.countDateTime) &&
                                         domain.equals(that.domain) &&
                                         event.equals(that.event); }

                             @Override public int hashCode() {
                                 final int prime =31; int result =1; result = 
prime * result +countDateTime.hashCode(); result = prime * result 
+domain.hashCode(); result = prime * result +event.hashCode(); return result; }
                        }

Reply via email to