Ok I used the method suggested by Ali. The error is gone. But now I see multiple counts emitted for the same key...
DataStream<MyEvent> slStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source") .uid(kafkaTopic).name(kafkaTopic) .setParallelism(kafkaParallelism) .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) <------ Timestamp in GMT created here rounded to the closest minute down. .uid("map-json-logs").name("map-json-logs"); slStream.keyBy(new MinutesKeySelector()) .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) <---- Tumbling window of 1 minute. So below you will see a new count was emitted at 16:51 and 16:55 {"countId":"2022-02-11T16:50:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":" mydomain.com","uri":"/some-article","count":3542} ----- {"countId":"2022-02-11T16:51:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" mydomain.com","uri":"/some-article","count":16503} {"countId":"2022-02-11T16:51:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" mydomain.com","uri":"/some-article","count":70} ----- {"countId":"2022-02-11T16:52:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":" mydomain.com","uri":"/some-article","count":16037} {"countId":"2022-02-11T16:53:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":" mydomain.com","uri":"/some-article","count":18679} {"countId":"2022-02-11T16:54:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":" mydomain.com","uri":"/some-article","count":17697} ----- {"countId":"2022-02-11T16:55:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" mydomain.com","uri":"/some-article","count":18066} {"countId":"2022-02-11T16:55:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" mydomain.com","uri":"/some-article","count":58} ----- {"countId":"2022-02-11T16:56:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":" mydomain.com","uri":"/some-article","count":17489} On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com> wrote: > Ok I think Ali's solution makes the most sense to me. I'll try it and let > you know. > > On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote: > >> Hi John, >> >> your getKey() implementation shows that it is not deterministic, since >> calling it with the same click instance multiple times will return >> different keys. For example a call at 12:01:59.950 and a call at >> 12:02:00.050 with the same click instance will return two different keys: >> >> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name >> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name >> >> best regards >> Jing >> >> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com> wrote: >> >>> Maybe there's a misunderstanding. But basically I want to do clickstream >>> count for a given "url" and for simplicity and accuracy of the count base >>> it on processing time (event time doesn't matter as long as I get a total >>> of clicks at that given processing time) >>> >>> So regardless of the event time. I want all clicks for the current >>> processing time rounded to the minute per link. >>> >>> So, if now was 2022-04-07T12:01:00.000Z >>> >>> Then I would want the following result... >>> >>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10 >>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2 >>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15 >>> .... >>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30 >>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1 >>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10 >>> And so on... >>> >>> @Override >>> public MyEventCountKey getKey(final MyEvent click) throws Exception >>> { >>> MyEventCountKey key = new MyEventCountKey( >>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), >>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(), >>> click.getDomain(), // cnn.com >>> click.getPath(), // /some-article-name >>> ); >>> return key; >>> } >>> >>> >>> >>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> wrote: >>> >>>> The key selector works. >>>> >>>> >>>> No it does not ;) It depends on the system time so it's not >>>> deterministic (you can get different keys for the very same element). >>>> >>>> How do you key a count based on the time. I have taken this from >>>>> samples online. >>>>> >>>> >>>> This is what the windowing is for. You basically want to group / >>>> combine elements per key and event time window [1]. >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ >>>> >>>> Best, >>>> D. >>>> >>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> >>>> wrote: >>>> >>>>> The key selector works. It only causes an issue if there too many keys >>>>> produced in one shot. For example of 100 "same" keys are produced for that >>>>> 1 minutes it's ok. But if 101 are produced the error happens. >>>>> >>>>> >>>>> If you look at the reproducer at least that's what's hapenning >>>>> >>>>> How do you key a count based on the time. I have taken this from >>>>> samples online. >>>>> >>>>> The key is that particular time for that particular URL path. >>>>> >>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00 >>>>> >>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <ches...@apache.org> >>>>> wrote: >>>>> >>>>>> Your Key selector doesn't need to implement hashCode, but given the >>>>>> same object it has to return the same key. >>>>>> In your reproducer the returned key will have different timestamps, >>>>>> and since the timestamp is included in the hashCode, they will be >>>>>> different >>>>>> each time. >>>>>> >>>>>> On 07/02/2022 14:50, John Smith wrote: >>>>>> >>>>>> I don't get it? I provided the reproducer. I implemented the >>>>>> interface to Key selector it needs hashcode and equals as well? >>>>>> >>>>>> I'm attempting to do click stream. So the key is based on processing >>>>>> date/time rounded to the minute + domain name + path >>>>>> >>>>>> So these should be valid below? >>>>>> >>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>> >>>>>> 2022-01-01T10:02:00 + cnn.com + /article2 >>>>>> >>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>> >>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>> >>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, < >>>>>> ches...@apache.org> wrote: >>>>>> >>>>>>> Don't KeySelectors also need to be deterministic? >>>>>>> >>>>>>> * The {@link KeySelector} allows to use deterministic objects for >>>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If >>>>>>> invoked multiple times on the same object, the returned key*** must be >>>>>>> the same.* >>>>>>> >>>>>>> >>>>>>> On 04/02/2022 18:25, John Smith wrote: >>>>>>> >>>>>>> Hi Francesco, here is the reproducer: >>>>>>> https://github.com/javadevmtl/flink-key-reproducer >>>>>>> >>>>>>> So, essentially it looks like when there's a high influx of records >>>>>>> produced from the source that the Exception is thrown. >>>>>>> >>>>>>> The key is generated by 3 values: date/time rounded to the minute >>>>>>> and 2 strings. >>>>>>> So you will see keys as follows... >>>>>>> 2022-02-04T17:20:00Z|foo|bar >>>>>>> 2022-02-04T17:21:00Z|foo|bar >>>>>>> 2022-02-04T17:22:00Z|foo|bar >>>>>>> >>>>>>> The reproducer has a custom source that basically produces a >>>>>>> record in a loop and sleeps for a specified period of milliseconds >>>>>>> 100ms in >>>>>>> this case. >>>>>>> The lower the sleep delay the faster records are produced the more >>>>>>> chances the exception is thrown. With a 100ms delay it's always thrown. >>>>>>> Setting a 2000 to 3000ms will guarantee it to work. >>>>>>> The original job uses a Kafka Source so it should technically be >>>>>>> able to handle even a couple thousand records per second. >>>>>>> >>>>>>> >>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Ok it's not my data either. I think it may be a volume issue. I >>>>>>>> have managed to consistently reproduce the error. I'll upload a >>>>>>>> reproducer >>>>>>>> ASAP. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce it. >>>>>>>>> But the actual job once in a while throws that error. So I'm >>>>>>>>> wondering if >>>>>>>>> maybe one of the records that comes in is not valid, though I do >>>>>>>>> validate >>>>>>>>> prior to getting to the key and window operators. >>>>>>>>> >>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for a >>>>>>>>>> bit and then it threw the error. >>>>>>>>>> >>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue? >>>>>>>>>>> >>>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats >>>>>>>>>>> print as expected. >>>>>>>>>>> >>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>>>>>>>> france...@ververica.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> your hash code and equals seems correct. Can you post a minimum >>>>>>>>>>>> stream pipeline reproducer using this class? >>>>>>>>>>>> >>>>>>>>>>>> FG >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith < >>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 >>>>>>>>>>>>> is not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. >>>>>>>>>>>>> Unless you're >>>>>>>>>>>>> directly using low level state access APIs, this is most likely >>>>>>>>>>>>> caused by >>>>>>>>>>>>> non-deterministic shuffle key (hashCode and equals >>>>>>>>>>>>> implementation). >>>>>>>>>>>>> >>>>>>>>>>>>> This is my class, is my hashCode deterministic? >>>>>>>>>>>>> >>>>>>>>>>>>> public final class MyEventCountKey { >>>>>>>>>>>>> private final String countDateTime; private final String >>>>>>>>>>>>> domain; private final String event; public >>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String domain, >>>>>>>>>>>>> final String event) { >>>>>>>>>>>>> this.countDateTime = countDateTime; this.domain = >>>>>>>>>>>>> domain; this.event = event; } >>>>>>>>>>>>> >>>>>>>>>>>>> public String getCountDateTime() { >>>>>>>>>>>>> return countDateTime; } >>>>>>>>>>>>> >>>>>>>>>>>>> public String getDomain() { >>>>>>>>>>>>> return domain; } >>>>>>>>>>>>> >>>>>>>>>>>>> public String getEven() { >>>>>>>>>>>>> return event; } >>>>>>>>>>>>> >>>>>>>>>>>>> @Override public String toString() { >>>>>>>>>>>>> return countDateTime + "|" + domain + "|" + event; } >>>>>>>>>>>>> >>>>>>>>>>>>> @Override public boolean equals(Object o) { >>>>>>>>>>>>> if (this == o) return true; if (o == null || >>>>>>>>>>>>> getClass() != o.getClass()) return false; MyEventCountKey >>>>>>>>>>>>> that = (MyEventCountKey) o; return >>>>>>>>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>>>>>>>> domain.equals(that.domain) && >>>>>>>>>>>>> event.equals(that.event); } >>>>>>>>>>>>> >>>>>>>>>>>>> @Override public int hashCode() { >>>>>>>>>>>>> final int prime = 31; int result = 1; >>>>>>>>>>>>> result = prime * result + countDateTime.hashCode(); result >>>>>>>>>>>>> = prime * result + domain.hashCode(); result = prime * >>>>>>>>>>>>> result + event.hashCode(); return result; } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>> >>>>>>