Hi, thanks. As previously mentioned, processing time. So I regardless when the event was generated I want to count all events I have right now (as soon as they are seen by the flink job).
On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek <a...@ververica.com> wrote: > Hello John, > > Currently you are grouping the elements two times based on some time > attribute, one while keying - with event time - and one while windowing - > with > processing time. Therefore, the windowing mechanism produces a new window > computation when you see an element with the same key but arrived later > from > the previous window start and end timestamps. Can you please clarify with > which notion of time you would like to handle the stream of data? > > Sincerely, > > Ali > > On Fri, Feb 11, 2022 at 6:43 PM John Smith <java.dev....@gmail.com> wrote: > >> Ok I used the method suggested by Ali. The error is gone. But now I see >> multiple counts emitted for the same key... >> >> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, >> WatermarkStrategy.noWatermarks(), "Kafka Source") >> .uid(kafkaTopic).name(kafkaTopic) >> .setParallelism(kafkaParallelism) >> .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) >> <------ Timestamp in GMT created here rounded to the closest minute down. >> .uid("map-json-logs").name("map-json-logs"); >> >> slStream.keyBy(new MinutesKeySelector()) >> >> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) >> <---- Tumbling window of 1 minute. >> >> >> >> So below you will see a new count was emitted at 16:51 and 16:55 >> >> {"countId":"2022-02-11T16:50:00Z|mydomain.com >> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":" >> mydomain.com","uri":"/some-article","count":3542} >> ----- >> {"countId":"2022-02-11T16:51:00Z|mydomain.com >> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >> mydomain.com","uri":"/some-article","count":16503} >> {"countId":"2022-02-11T16:51:00Z|mydomain.com >> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >> mydomain.com","uri":"/some-article","count":70} >> ----- >> >> {"countId":"2022-02-11T16:52:00Z|mydomain.com >> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":" >> mydomain.com","uri":"/some-article","count":16037} >> {"countId":"2022-02-11T16:53:00Z|mydomain.com >> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":" >> mydomain.com","uri":"/some-article","count":18679} >> {"countId":"2022-02-11T16:54:00Z|mydomain.com >> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":" >> mydomain.com","uri":"/some-article","count":17697} >> ----- >> >> {"countId":"2022-02-11T16:55:00Z|mydomain.com >> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >> mydomain.com","uri":"/some-article","count":18066} >> {"countId":"2022-02-11T16:55:00Z|mydomain.com >> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >> mydomain.com","uri":"/some-article","count":58} >> ----- >> {"countId":"2022-02-11T16:56:00Z|mydomain.com >> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":" >> mydomain.com","uri":"/some-article","count":17489} >> >> >> >> >> On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com> >> wrote: >> >>> Ok I think Ali's solution makes the most sense to me. I'll try it and >>> let you know. >>> >>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote: >>> >>>> Hi John, >>>> >>>> your getKey() implementation shows that it is not deterministic, since >>>> calling it with the same click instance multiple times will return >>>> different keys. For example a call at 12:01:59.950 and a call at >>>> 12:02:00.050 with the same click instance will return two different keys: >>>> >>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name >>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name >>>> >>>> best regards >>>> Jing >>>> >>>> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com> >>>> wrote: >>>> >>>>> Maybe there's a misunderstanding. But basically I want to >>>>> do clickstream count for a given "url" and for simplicity and accuracy of >>>>> the count base it on processing time (event time doesn't matter as long as >>>>> I get a total of clicks at that given processing time) >>>>> >>>>> So regardless of the event time. I want all clicks for the current >>>>> processing time rounded to the minute per link. >>>>> >>>>> So, if now was 2022-04-07T12:01:00.000Z >>>>> >>>>> Then I would want the following result... >>>>> >>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10 >>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2 >>>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15 >>>>> .... >>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30 >>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1 >>>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10 >>>>> And so on... >>>>> >>>>> @Override >>>>> public MyEventCountKey getKey(final MyEvent click) throws Exception >>>>> { >>>>> MyEventCountKey key = new MyEventCountKey( >>>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), >>>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(), >>>>> click.getDomain(), // cnn.com >>>>> click.getPath(), // /some-article-name >>>>> ); >>>>> return key; >>>>> } >>>>> >>>>> >>>>> >>>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> wrote: >>>>> >>>>>> The key selector works. >>>>>> >>>>>> >>>>>> No it does not ;) It depends on the system time so it's not >>>>>> deterministic (you can get different keys for the very same element). >>>>>> >>>>>> How do you key a count based on the time. I have taken this from >>>>>>> samples online. >>>>>>> >>>>>> >>>>>> This is what the windowing is for. You basically want to group / >>>>>> combine elements per key and event time window [1]. >>>>>> >>>>>> [1] >>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ >>>>>> >>>>>> Best, >>>>>> D. >>>>>> >>>>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> The key selector works. It only causes an issue if there too many >>>>>>> keys produced in one shot. For example of 100 "same" keys are produced >>>>>>> for >>>>>>> that 1 minutes it's ok. But if 101 are produced the error happens. >>>>>>> >>>>>>> >>>>>>> If you look at the reproducer at least that's what's hapenning >>>>>>> >>>>>>> How do you key a count based on the time. I have taken this from >>>>>>> samples online. >>>>>>> >>>>>>> The key is that particular time for that particular URL path. >>>>>>> >>>>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00 >>>>>>> >>>>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, < >>>>>>> ches...@apache.org> wrote: >>>>>>> >>>>>>>> Your Key selector doesn't need to implement hashCode, but given the >>>>>>>> same object it has to return the same key. >>>>>>>> In your reproducer the returned key will have different timestamps, >>>>>>>> and since the timestamp is included in the hashCode, they will be >>>>>>>> different >>>>>>>> each time. >>>>>>>> >>>>>>>> On 07/02/2022 14:50, John Smith wrote: >>>>>>>> >>>>>>>> I don't get it? I provided the reproducer. I implemented the >>>>>>>> interface to Key selector it needs hashcode and equals as well? >>>>>>>> >>>>>>>> I'm attempting to do click stream. So the key is based on >>>>>>>> processing date/time rounded to the minute + domain name + path >>>>>>>> >>>>>>>> So these should be valid below? >>>>>>>> >>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>> >>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article2 >>>>>>>> >>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>> >>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>> >>>>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, < >>>>>>>> ches...@apache.org> wrote: >>>>>>>> >>>>>>>>> Don't KeySelectors also need to be deterministic? >>>>>>>>> >>>>>>>>> * The {@link KeySelector} allows to use deterministic objects for >>>>>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If >>>>>>>>> invoked multiple times on the same object, the returned key*** must >>>>>>>>> be the same.* >>>>>>>>> >>>>>>>>> >>>>>>>>> On 04/02/2022 18:25, John Smith wrote: >>>>>>>>> >>>>>>>>> Hi Francesco, here is the reproducer: >>>>>>>>> https://github.com/javadevmtl/flink-key-reproducer >>>>>>>>> >>>>>>>>> So, essentially it looks like when there's a high influx of >>>>>>>>> records produced from the source that the Exception is thrown. >>>>>>>>> >>>>>>>>> The key is generated by 3 values: date/time rounded to the minute >>>>>>>>> and 2 strings. >>>>>>>>> So you will see keys as follows... >>>>>>>>> 2022-02-04T17:20:00Z|foo|bar >>>>>>>>> 2022-02-04T17:21:00Z|foo|bar >>>>>>>>> 2022-02-04T17:22:00Z|foo|bar >>>>>>>>> >>>>>>>>> The reproducer has a custom source that basically produces a >>>>>>>>> record in a loop and sleeps for a specified period of milliseconds >>>>>>>>> 100ms in >>>>>>>>> this case. >>>>>>>>> The lower the sleep delay the faster records are produced the more >>>>>>>>> chances the exception is thrown. With a 100ms delay it's always >>>>>>>>> thrown. >>>>>>>>> Setting a 2000 to 3000ms will guarantee it to work. >>>>>>>>> The original job uses a Kafka Source so it should technically be >>>>>>>>> able to handle even a couple thousand records per second. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Ok it's not my data either. I think it may be a volume issue. I >>>>>>>>>> have managed to consistently reproduce the error. I'll upload a >>>>>>>>>> reproducer >>>>>>>>>> ASAP. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce >>>>>>>>>>> it. But the actual job once in a while throws that error. So I'm >>>>>>>>>>> wondering >>>>>>>>>>> if maybe one of the records that comes in is not valid, though I do >>>>>>>>>>> validate prior to getting to the key and window operators. >>>>>>>>>>> >>>>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for a >>>>>>>>>>>> bit and then it threw the error. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue? >>>>>>>>>>>>> >>>>>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats >>>>>>>>>>>>> print as expected. >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>>>>>>>>>> france...@ververica.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> your hash code and equals seems correct. Can you post a >>>>>>>>>>>>>> minimum stream pipeline reproducer using this class? >>>>>>>>>>>>>> >>>>>>>>>>>>>> FG >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith < >>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 >>>>>>>>>>>>>>> is not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. >>>>>>>>>>>>>>> Unless you're >>>>>>>>>>>>>>> directly using low level state access APIs, this is most likely >>>>>>>>>>>>>>> caused by >>>>>>>>>>>>>>> non-deterministic shuffle key (hashCode and equals >>>>>>>>>>>>>>> implementation). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is my class, is my hashCode deterministic? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public final class MyEventCountKey { >>>>>>>>>>>>>>> private final String countDateTime; private final String >>>>>>>>>>>>>>> domain; private final String event; public >>>>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String >>>>>>>>>>>>>>> domain, final String event) { >>>>>>>>>>>>>>> this.countDateTime = countDateTime; this.domain >>>>>>>>>>>>>>> = domain; this.event = event; } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public String getCountDateTime() { >>>>>>>>>>>>>>> return countDateTime; } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public String getDomain() { >>>>>>>>>>>>>>> return domain; } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public String getEven() { >>>>>>>>>>>>>>> return event; } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override public String toString() { >>>>>>>>>>>>>>> return countDateTime + "|" + domain + "|" + event; } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override public boolean equals(Object o) { >>>>>>>>>>>>>>> if (this == o) return true; if (o == null || >>>>>>>>>>>>>>> getClass() != o.getClass()) return false; >>>>>>>>>>>>>>> MyEventCountKey that = (MyEventCountKey) o; return >>>>>>>>>>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>>>>>>>>>> domain.equals(that.domain) && >>>>>>>>>>>>>>> event.equals(that.event); } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override public int hashCode() { >>>>>>>>>>>>>>> final int prime = 31; int result = 1; >>>>>>>>>>>>>>> result = prime * result + countDateTime.hashCode(); >>>>>>>>>>>>>>> result = prime * result + domain.hashCode(); result = >>>>>>>>>>>>>>> prime * result + event.hashCode(); return result; } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>