> > The key selector works.
No it does not ;) It depends on the system time so it's not deterministic (you can get different keys for the very same element). How do you key a count based on the time. I have taken this from samples > online. > This is what the windowing is for. You basically want to group / combine elements per key and event time window [1]. [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ Best, D. On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> wrote: > The key selector works. It only causes an issue if there too many keys > produced in one shot. For example of 100 "same" keys are produced for that > 1 minutes it's ok. But if 101 are produced the error happens. > > > If you look at the reproducer at least that's what's hapenning > > How do you key a count based on the time. I have taken this from samples > online. > > The key is that particular time for that particular URL path. > > So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00 > > On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <ches...@apache.org> > wrote: > >> Your Key selector doesn't need to implement hashCode, but given the same >> object it has to return the same key. >> In your reproducer the returned key will have different timestamps, and >> since the timestamp is included in the hashCode, they will be different >> each time. >> >> On 07/02/2022 14:50, John Smith wrote: >> >> I don't get it? I provided the reproducer. I implemented the interface to >> Key selector it needs hashcode and equals as well? >> >> I'm attempting to do click stream. So the key is based on processing >> date/time rounded to the minute + domain name + path >> >> So these should be valid below? >> >> 2022-01-01T10:02:00 + cnn.com + /article1 >> 2022-01-01T10:02:00 + cnn.com + /article1 >> 2022-01-01T10:02:00 + cnn.com + /article1 >> >> 2022-01-01T10:02:00 + cnn.com + /article2 >> >> 2022-01-01T10:03:00 + cnn.com + /article1 >> 2022-01-01T10:03:00 + cnn.com + /article1 >> >> 2022-01-01T10:03:00 + cnn.com + /article3 >> 2022-01-01T10:03:00 + cnn.com + /article3 >> >> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <ches...@apache.org> >> wrote: >> >>> Don't KeySelectors also need to be deterministic? >>> >>> * The {@link KeySelector} allows to use deterministic objects for >>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked >>> multiple times on the same object, the returned key*** must be the same.* >>> >>> >>> On 04/02/2022 18:25, John Smith wrote: >>> >>> Hi Francesco, here is the reproducer: >>> https://github.com/javadevmtl/flink-key-reproducer >>> >>> So, essentially it looks like when there's a high influx of records >>> produced from the source that the Exception is thrown. >>> >>> The key is generated by 3 values: date/time rounded to the minute and 2 >>> strings. >>> So you will see keys as follows... >>> 2022-02-04T17:20:00Z|foo|bar >>> 2022-02-04T17:21:00Z|foo|bar >>> 2022-02-04T17:22:00Z|foo|bar >>> >>> The reproducer has a custom source that basically produces a record in a >>> loop and sleeps for a specified period of milliseconds 100ms in this case. >>> The lower the sleep delay the faster records are produced the more >>> chances the exception is thrown. With a 100ms delay it's always thrown. >>> Setting a 2000 to 3000ms will guarantee it to work. >>> The original job uses a Kafka Source so it should technically be able to >>> handle even a couple thousand records per second. >>> >>> >>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> wrote: >>> >>>> Ok it's not my data either. I think it may be a volume issue. I have >>>> managed to consistently reproduce the error. I'll upload a reproducer ASAP. >>>> >>>> >>>> >>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> wrote: >>>> >>>>> Ok so I tried to create a reproducer but I couldn't reproduce it. But >>>>> the actual job once in a while throws that error. So I'm wondering if >>>>> maybe >>>>> one of the records that comes in is not valid, though I do validate prior >>>>> to getting to the key and window operators. >>>>> >>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Actually maybe not because with PrintSinkFunction it ran for a bit >>>>>> and then it threw the error. >>>>>> >>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Ok it may be the ElasticSearch connector causing the issue? >>>>>>> >>>>>>> If I use PrintSinkFunction then I get no error and my stats print as >>>>>>> expected. >>>>>>> >>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>>>> france...@ververica.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> your hash code and equals seems correct. Can you post a minimum >>>>>>>> stream pipeline reproducer using this class? >>>>>>>> >>>>>>>> FG >>>>>>>> >>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <java.dev....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is >>>>>>>>> not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're >>>>>>>>> directly using low level state access APIs, this is most likely >>>>>>>>> caused by >>>>>>>>> non-deterministic shuffle key (hashCode and equals implementation). >>>>>>>>> >>>>>>>>> This is my class, is my hashCode deterministic? >>>>>>>>> >>>>>>>>> public final class MyEventCountKey { >>>>>>>>> private final String countDateTime; private final String >>>>>>>>> domain; private final String event; public >>>>>>>>> MyEventCountKey(final String countDateTime, final String domain, >>>>>>>>> final String event) { >>>>>>>>> this.countDateTime = countDateTime; this.domain = >>>>>>>>> domain; this.event = event; } >>>>>>>>> >>>>>>>>> public String getCountDateTime() { >>>>>>>>> return countDateTime; } >>>>>>>>> >>>>>>>>> public String getDomain() { >>>>>>>>> return domain; } >>>>>>>>> >>>>>>>>> public String getEven() { >>>>>>>>> return event; } >>>>>>>>> >>>>>>>>> @Override public String toString() { >>>>>>>>> return countDateTime + "|" + domain + "|" + event; } >>>>>>>>> >>>>>>>>> @Override public boolean equals(Object o) { >>>>>>>>> if (this == o) return true; if (o == null || >>>>>>>>> getClass() != o.getClass()) return false; MyEventCountKey that >>>>>>>>> = (MyEventCountKey) o; return >>>>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>>>> domain.equals(that.domain) && >>>>>>>>> event.equals(that.event); } >>>>>>>>> >>>>>>>>> @Override public int hashCode() { >>>>>>>>> final int prime = 31; int result = 1; result = >>>>>>>>> prime * result + countDateTime.hashCode(); result = prime * >>>>>>>>> result + domain.hashCode(); result = prime * result + >>>>>>>>> event.hashCode(); return result; } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>> >>