Hi John,

your getKey() implementation shows that it is not deterministic, since
calling it with the same click instance multiple times will return
different keys. For example a call at 12:01:59.950 and a call at
12:02:00.050 with the same click instance will return two different keys:

2022-04-07T12:01:00.000Z|cnn.com|some-article-name
2022-04-07T12:02:00.000Z|cnn.com|some-article-name

best regards
Jing

On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com> wrote:

> Maybe there's a misunderstanding. But basically I want to do clickstream
> count for a given "url" and for simplicity and accuracy of the count base
> it on processing time (event time doesn't matter as long as I get a total
> of clicks at that given processing time)
>
> So regardless of the event time. I want all clicks for the current
> processing time rounded to the minute per link.
>
> So, if now was 2022-04-07T12:01:00.000Z
>
> Then I would want the following result...
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
> ....
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
> And so on...
>
> @Override
> public MyEventCountKey getKey(final MyEvent click) throws Exception
> {
> MyEventCountKey key = new MyEventCountKey(
> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
> click.getDomain(), // cnn.com
> click.getPath(), // /some-article-name
> );
> return key;
> }
>
>
>
> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> wrote:
>
>> The key selector works.
>>
>>
>> No it does not ;) It depends on the system time so it's not deterministic
>> (you can get different keys for the very same element).
>>
>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>
>> This is what the windowing is for. You basically want to group / combine
>> elements per key and event time window [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>
>> Best,
>> D.
>>
>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> wrote:
>>
>>> The key selector works. It only causes an issue if there too many keys
>>> produced in one shot. For example of 100 "same" keys are produced for that
>>> 1 minutes it's ok. But if 101 are produced the error happens.
>>>
>>>
>>> If you look at the reproducer at least that's what's hapenning
>>>
>>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>> The key is that particular time for that particular URL path.
>>>
>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>
>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <ches...@apache.org>
>>> wrote:
>>>
>>>> Your Key selector doesn't need to implement hashCode, but given the
>>>> same object it has to return the same key.
>>>> In your reproducer the returned key will have different timestamps, and
>>>> since the timestamp is included in the hashCode, they will be different
>>>> each time.
>>>>
>>>> On 07/02/2022 14:50, John Smith wrote:
>>>>
>>>> I don't get it? I provided the reproducer. I implemented the interface
>>>> to Key selector it needs hashcode and equals as well?
>>>>
>>>> I'm attempting to do click stream. So the key is based on processing
>>>> date/time rounded to the minute + domain name + path
>>>>
>>>> So these should be valid below?
>>>>
>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>
>>>> 2022-01-01T10:02:00 + cnn.com + /article2
>>>>
>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>
>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>
>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <ches...@apache.org>
>>>> wrote:
>>>>
>>>>> Don't KeySelectors also need to be deterministic?
>>>>>
>>>>> * The {@link KeySelector} allows to use deterministic objects for 
>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
>>>>> multiple times on the same object, the returned key*** must be the same.*
>>>>>
>>>>>
>>>>> On 04/02/2022 18:25, John Smith wrote:
>>>>>
>>>>> Hi Francesco,  here is the reproducer:
>>>>> https://github.com/javadevmtl/flink-key-reproducer
>>>>>
>>>>> So, essentially it looks like when there's a high influx of records
>>>>> produced from the source that the Exception is thrown.
>>>>>
>>>>> The key is generated by 3 values: date/time rounded to the minute and
>>>>> 2 strings.
>>>>> So you will see keys as follows...
>>>>> 2022-02-04T17:20:00Z|foo|bar
>>>>> 2022-02-04T17:21:00Z|foo|bar
>>>>> 2022-02-04T17:22:00Z|foo|bar
>>>>>
>>>>> The reproducer has a custom source that basically produces a record in
>>>>> a loop and sleeps for a specified period of milliseconds 100ms in this 
>>>>> case.
>>>>> The lower the sleep delay the faster records are produced the more
>>>>> chances the exception is thrown. With a 100ms delay it's always thrown.
>>>>> Setting a 2000 to 3000ms will guarantee it to work.
>>>>> The original job uses a Kafka Source so it should technically be able
>>>>> to handle even a couple thousand records per second.
>>>>>
>>>>>
>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ok it's not my data either. I think it may be a volume issue. I have
>>>>>> managed to consistently reproduce the error. I'll upload a reproducer 
>>>>>> ASAP.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce it.
>>>>>>> But the actual job once in a while throws that error. So I'm wondering 
>>>>>>> if
>>>>>>> maybe one of the records that comes in is not valid, though I do 
>>>>>>> validate
>>>>>>> prior to getting to the key and window operators.
>>>>>>>
>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for a bit
>>>>>>>> and then it threw the error.
>>>>>>>>
>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>>>>>
>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats print
>>>>>>>>> as expected.
>>>>>>>>>
>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>>>>>> france...@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> your hash code and equals seems correct. Can you post a minimum
>>>>>>>>>> stream pipeline reproducer using this class?
>>>>>>>>>>
>>>>>>>>>> FG
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <java.dev....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is
>>>>>>>>>>> not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless 
>>>>>>>>>>> you're
>>>>>>>>>>> directly using low level state access APIs, this is most likely 
>>>>>>>>>>> caused by
>>>>>>>>>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>>>>>>>>>
>>>>>>>>>>> This is my class, is my hashCode deterministic?
>>>>>>>>>>>
>>>>>>>>>>> public final class MyEventCountKey {
>>>>>>>>>>>     private final String countDateTime;    private final String 
>>>>>>>>>>> domain;    private final String event;    public 
>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String domain, 
>>>>>>>>>>> final String event) {
>>>>>>>>>>>         this.countDateTime = countDateTime;        this.domain = 
>>>>>>>>>>> domain;        this.event = event;    }
>>>>>>>>>>>
>>>>>>>>>>>     public String getCountDateTime() {
>>>>>>>>>>>         return countDateTime;    }
>>>>>>>>>>>
>>>>>>>>>>>     public String getDomain() {
>>>>>>>>>>>         return domain;    }
>>>>>>>>>>>
>>>>>>>>>>>     public String getEven() {
>>>>>>>>>>>         return event;    }
>>>>>>>>>>>
>>>>>>>>>>>     @Override    public String toString() {
>>>>>>>>>>>         return countDateTime + "|" + domain + "|" + event;    }
>>>>>>>>>>>
>>>>>>>>>>>     @Override    public boolean equals(Object o) {
>>>>>>>>>>>         if (this == o) return true;        if (o == null || 
>>>>>>>>>>> getClass() != o.getClass()) return false;        MyEventCountKey 
>>>>>>>>>>> that = (MyEventCountKey) o;        return 
>>>>>>>>>>> countDateTime.equals(that.countDateTime) &&
>>>>>>>>>>>                 domain.equals(that.domain) &&
>>>>>>>>>>>                 event.equals(that.event);    }
>>>>>>>>>>>
>>>>>>>>>>>     @Override    public int hashCode() {
>>>>>>>>>>>         final int prime = 31;        int result = 1;        result 
>>>>>>>>>>> = prime * result + countDateTime.hashCode();        result = prime 
>>>>>>>>>>> * result + domain.hashCode();        result = prime * result +  
>>>>>>>>>>> event.hashCode();        return result;    }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>
>>>>

Reply via email to