Ok I used the method suggested by Ali. The error is gone. But now I see
multiple counts emitted for the same key...

DataStream<MyEvent> slStream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka Source")
        .uid(kafkaTopic).name(kafkaTopic)
        .setParallelism(kafkaParallelism)
        .flatMap(new MapToMyEvent("my-event", windowSizeMins,
"message")) <------ Timestamp in GMT created here rounded to the
closest minute down.
        .uid("map-json-logs").name("map-json-logs");

        slStream.keyBy(new MinutesKeySelector())
        .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins)))
<---- Tumbling window of 1 minute.



So below you will see a new count was emitted at 16:51 and 16:55

{"countId":"2022-02-11T16:50:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
mydomain.com","uri":"/some-article","count":3542}
-----
{"countId":"2022-02-11T16:51:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
mydomain.com","uri":"/some-article","count":16503}
{"countId":"2022-02-11T16:51:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
mydomain.com","uri":"/some-article","count":70}
-----

{"countId":"2022-02-11T16:52:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
mydomain.com","uri":"/some-article","count":16037}
{"countId":"2022-02-11T16:53:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
mydomain.com","uri":"/some-article","count":18679}
{"countId":"2022-02-11T16:54:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
mydomain.com","uri":"/some-article","count":17697}
-----

{"countId":"2022-02-11T16:55:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
mydomain.com","uri":"/some-article","count":18066}
{"countId":"2022-02-11T16:55:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
mydomain.com","uri":"/some-article","count":58}
-----
{"countId":"2022-02-11T16:56:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
mydomain.com","uri":"/some-article","count":17489}




On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com> wrote:

> Ok I think Ali's solution makes the most sense to me. I'll try it and let
> you know.
>
> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote:
>
>> Hi John,
>>
>> your getKey() implementation shows that it is not deterministic, since
>> calling it with the same click instance multiple times will return
>> different keys. For example a call at 12:01:59.950 and a call at
>> 12:02:00.050 with the same click instance will return two different keys:
>>
>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>>
>> best regards
>> Jing
>>
>> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com> wrote:
>>
>>> Maybe there's a misunderstanding. But basically I want to do clickstream
>>> count for a given "url" and for simplicity and accuracy of the count base
>>> it on processing time (event time doesn't matter as long as I get a total
>>> of clicks at that given processing time)
>>>
>>> So regardless of the event time. I want all clicks for the current
>>> processing time rounded to the minute per link.
>>>
>>> So, if now was 2022-04-07T12:01:00.000Z
>>>
>>> Then I would want the following result...
>>>
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>>> ....
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>>> And so on...
>>>
>>> @Override
>>> public MyEventCountKey getKey(final MyEvent click) throws Exception
>>> {
>>> MyEventCountKey key = new MyEventCountKey(
>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
>>> click.getDomain(), // cnn.com
>>> click.getPath(), // /some-article-name
>>> );
>>> return key;
>>> }
>>>
>>>
>>>
>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> wrote:
>>>
>>>> The key selector works.
>>>>
>>>>
>>>> No it does not ;) It depends on the system time so it's not
>>>> deterministic (you can get different keys for the very same element).
>>>>
>>>> How do you key a count based on the time. I have taken this from
>>>>> samples online.
>>>>>
>>>>
>>>> This is what the windowing is for. You basically want to group /
>>>> combine elements per key and event time window [1].
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com>
>>>> wrote:
>>>>
>>>>> The key selector works. It only causes an issue if there too many keys
>>>>> produced in one shot. For example of 100 "same" keys are produced for that
>>>>> 1 minutes it's ok. But if 101 are produced the error happens.
>>>>>
>>>>>
>>>>> If you look at the reproducer at least that's what's hapenning
>>>>>
>>>>> How do you key a count based on the time. I have taken this from
>>>>> samples online.
>>>>>
>>>>> The key is that particular time for that particular URL path.
>>>>>
>>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>>>
>>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <ches...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Your Key selector doesn't need to implement hashCode, but given the
>>>>>> same object it has to return the same key.
>>>>>> In your reproducer the returned key will have different timestamps,
>>>>>> and since the timestamp is included in the hashCode, they will be 
>>>>>> different
>>>>>> each time.
>>>>>>
>>>>>> On 07/02/2022 14:50, John Smith wrote:
>>>>>>
>>>>>> I don't get it? I provided the reproducer. I implemented the
>>>>>> interface to Key selector it needs hashcode and equals as well?
>>>>>>
>>>>>> I'm attempting to do click stream. So the key is based on processing
>>>>>> date/time rounded to the minute + domain name + path
>>>>>>
>>>>>> So these should be valid below?
>>>>>>
>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>
>>>>>> 2022-01-01T10:02:00 + cnn.com + /article2
>>>>>>
>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>>>
>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>>>
>>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <
>>>>>> ches...@apache.org> wrote:
>>>>>>
>>>>>>> Don't KeySelectors also need to be deterministic?
>>>>>>>
>>>>>>> * The {@link KeySelector} allows to use deterministic objects for 
>>>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If 
>>>>>>> invoked multiple times on the same object, the returned key*** must be 
>>>>>>> the same.*
>>>>>>>
>>>>>>>
>>>>>>> On 04/02/2022 18:25, John Smith wrote:
>>>>>>>
>>>>>>> Hi Francesco,  here is the reproducer:
>>>>>>> https://github.com/javadevmtl/flink-key-reproducer
>>>>>>>
>>>>>>> So, essentially it looks like when there's a high influx of records
>>>>>>> produced from the source that the Exception is thrown.
>>>>>>>
>>>>>>> The key is generated by 3 values: date/time rounded to the minute
>>>>>>> and 2 strings.
>>>>>>> So you will see keys as follows...
>>>>>>> 2022-02-04T17:20:00Z|foo|bar
>>>>>>> 2022-02-04T17:21:00Z|foo|bar
>>>>>>> 2022-02-04T17:22:00Z|foo|bar
>>>>>>>
>>>>>>> The reproducer has a custom source that basically produces a
>>>>>>> record in a loop and sleeps for a specified period of milliseconds 
>>>>>>> 100ms in
>>>>>>> this case.
>>>>>>> The lower the sleep delay the faster records are produced the more
>>>>>>> chances the exception is thrown. With a 100ms delay it's always thrown.
>>>>>>> Setting a 2000 to 3000ms will guarantee it to work.
>>>>>>> The original job uses a Kafka Source so it should technically be
>>>>>>> able to handle even a couple thousand records per second.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ok it's not my data either. I think it may be a volume issue. I
>>>>>>>> have managed to consistently reproduce the error. I'll upload a 
>>>>>>>> reproducer
>>>>>>>> ASAP.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce it.
>>>>>>>>> But the actual job once in a while throws that error. So I'm 
>>>>>>>>> wondering if
>>>>>>>>> maybe one of the records that comes in is not valid, though I do 
>>>>>>>>> validate
>>>>>>>>> prior to getting to the key and window operators.
>>>>>>>>>
>>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for a
>>>>>>>>>> bit and then it threw the error.
>>>>>>>>>>
>>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>>>>>>>
>>>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats
>>>>>>>>>>> print as expected.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>>>>>>>> france...@ververica.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> your hash code and equals seems correct. Can you post a minimum
>>>>>>>>>>>> stream pipeline reproducer using this class?
>>>>>>>>>>>>
>>>>>>>>>>>> FG
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <
>>>>>>>>>>>> java.dev....@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39
>>>>>>>>>>>>> is not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. 
>>>>>>>>>>>>> Unless you're
>>>>>>>>>>>>> directly using low level state access APIs, this is most likely 
>>>>>>>>>>>>> caused by
>>>>>>>>>>>>> non-deterministic shuffle key (hashCode and equals 
>>>>>>>>>>>>> implementation).
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is my class, is my hashCode deterministic?
>>>>>>>>>>>>>
>>>>>>>>>>>>> public final class MyEventCountKey {
>>>>>>>>>>>>>     private final String countDateTime;    private final String 
>>>>>>>>>>>>> domain;    private final String event;    public 
>>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String domain, 
>>>>>>>>>>>>> final String event) {
>>>>>>>>>>>>>         this.countDateTime = countDateTime;        this.domain = 
>>>>>>>>>>>>> domain;        this.event = event;    }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     public String getCountDateTime() {
>>>>>>>>>>>>>         return countDateTime;    }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     public String getDomain() {
>>>>>>>>>>>>>         return domain;    }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     public String getEven() {
>>>>>>>>>>>>>         return event;    }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override    public String toString() {
>>>>>>>>>>>>>         return countDateTime + "|" + domain + "|" + event;    }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override    public boolean equals(Object o) {
>>>>>>>>>>>>>         if (this == o) return true;        if (o == null || 
>>>>>>>>>>>>> getClass() != o.getClass()) return false;        MyEventCountKey 
>>>>>>>>>>>>> that = (MyEventCountKey) o;        return 
>>>>>>>>>>>>> countDateTime.equals(that.countDateTime) &&
>>>>>>>>>>>>>                 domain.equals(that.domain) &&
>>>>>>>>>>>>>                 event.equals(that.event);    }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override    public int hashCode() {
>>>>>>>>>>>>>         final int prime = 31;        int result = 1;        
>>>>>>>>>>>>> result = prime * result + countDateTime.hashCode();        result 
>>>>>>>>>>>>> = prime * result + domain.hashCode();        result = prime * 
>>>>>>>>>>>>> result +  event.hashCode();        return result;    }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>
>>>>>>

Reply via email to