Hello John,
The requirement you have can be achieved by having a process window function
in order to enrich the aggregate data with metadata information of the
window.
Please have a look at the training example[1] to see how to access the
window
information within a process window function.
Since
Hi, I get that but I want to output that key so I can store it in Elastic
grouped by the minute.
I had explained with data examples above. But just to be sure
Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get
the bellow clicks
event time here (ignored/not read)|cnn.co
Hello John,
That is what exactly the window operator does for you. Can you please check
the
documentation[1] and let us know what part of the window operator alone does
not suffice for the use case?
Sincerely,
Ali
[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastre
Because I want to group them for the last X minutes. In this case last 1
minute.
On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek
wrote:
> Hello John,
>
> Then may I ask you why you need to use a time attribute as part of your
> key?
> Why not just key by the fields like `mydomain.com` and `s
Hello John,
Then may I ask you why you need to use a time attribute as part of your key?
Why not just key by the fields like `mydomain.com` and `some-article` in
your
example and use only window operator for grouping elements based on time?
Sincerely,
Ali
On Mon, Feb 14, 2022 at 3:55 PM John Sm
Hi, thanks. As previously mentioned, processing time. So I regardless when
the event was generated I want to count all events I have right now (as
soon as they are seen by the flink job).
On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek
wrote:
> Hello John,
>
> Currently you are grouping the e
Hello John,
Currently you are grouping the elements two times based on some time
attribute, one while keying - with event time - and one while windowing -
with
processing time. Therefore, the windowing mechanism produces a new window
computation when you see an element with the same key but arrive
Ok I used the method suggested by Ali. The error is gone. But now I see
multiple counts emitted for the same key...
DataStream slStream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka Source")
.uid(kafkaTopic).name(kafkaTopic)
.setParallelism(kafkaParallelism
Ok I think Ali's solution makes the most sense to me. I'll try it and let
you know.
On Mon, Feb 7, 2022 at 11:44 AM Jing Ge wrote:
> Hi John,
>
> your getKey() implementation shows that it is not deterministic, since
> calling it with the same click instance multiple times will return
> differen
Hi John,
your getKey() implementation shows that it is not deterministic, since
calling it with the same click instance multiple times will return
different keys. For example a call at 12:01:59.950 and a call at
12:02:00.050 with the same click instance will return two different keys:
2022-04-07T
Hello John,
During the lifecycle of the execution for a given event, the key information
is not passed in between different operators, but they are computed based on
the given key selector, every time an (keyed)operator sees the event.
Therefore, the same event, within the same pipeline, could be
Maybe there's a misunderstanding. But basically I want to do clickstream
count for a given "url" and for simplicity and accuracy of the count base
it on processing time (event time doesn't matter as long as I get a total
of clicks at that given processing time)
So regardless of the event time. I w
>
> The key selector works.
No it does not ;) It depends on the system time so it's not deterministic
(you can get different keys for the very same element).
How do you key a count based on the time. I have taken this from samples
> online.
>
This is what the windowing is for. You basically wan
The key selector works. It only causes an issue if there too many keys
produced in one shot. For example of 100 "same" keys are produced for that
1 minutes it's ok. But if 101 are produced the error happens.
If you look at the reproducer at least that's what's hapenning
How do you key a count ba
Your Key selector doesn't need to implement hashCode, but given the same
object it has to return the same key.
In your reproducer the returned key will have different timestamps, and
since the timestamp is included in the hashCode, they will be different
each time.
On 07/02/2022 14:50, John Sm
I don't get it? I provided the reproducer. I implemented the interface to
Key selector it needs hashcode and equals as well?
I'm attempting to do click stream. So the key is based on processing
date/time rounded to the minute + domain name + path
So these should be valid below?
2022-01-01T10:02:
Don't KeySelectors also need to be deterministic?
* The {@link KeySelector} allows to use deterministic objects for
operations such as reduce, * reduceGroup, join, coGroup, etc. *If
invoked multiple times on the same object, the returned key* must be
the same.*
On 04/02/2022 18:25, John
Hi Francesco, here is the reproducer:
https://github.com/javadevmtl/flink-key-reproducer
So, essentially it looks like when there's a high influx of records
produced from the source that the Exception is thrown.
The key is generated by 3 values: date/time rounded to the minute and 2
strings.
So
Ok it's not my data either. I think it may be a volume issue. I have
managed to consistently reproduce the error. I'll upload a reproducer ASAP.
On Thu, 3 Feb 2022 at 15:37, John Smith wrote:
> Ok so I tried to create a reproducer but I couldn't reproduce it. But the
> actual job once in a whi
Ok so I tried to create a reproducer but I couldn't reproduce it. But the
actual job once in a while throws that error. So I'm wondering if maybe one
of the records that comes in is not valid, though I do validate prior to
getting to the key and window operators.
On Thu, 3 Feb 2022 at 14:32, John
Actually maybe not because with PrintSinkFunction it ran for a bit and then
it threw the error.
On Thu, 3 Feb 2022 at 14:24, John Smith wrote:
> Ok it may be the ElasticSearch connector causing the issue?
>
> If I use PrintSinkFunction then I get no error and my stats print as
> expected.
>
> On
Ok it may be the ElasticSearch connector causing the issue?
If I use PrintSinkFunction then I get no error and my stats print as
expected.
On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani
wrote:
> Hi,
> your hash code and equals seems correct. Can you post a minimum stream
> pipeline reproducer
Hi,
your hash code and equals seems correct. Can you post a minimum stream
pipeline reproducer using this class?
FG
On Tue, Feb 1, 2022 at 8:39 PM John Smith wrote:
> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unles
Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
using low level state access APIs, this is most likely caused by
non-deterministic shuffle key (hashCode and equals implementation).
This is my class, is
24 matches
Mail list logo