Hi all,
Not sure if it’s a typical use case but is it a good idea to synchronize the
processElement and onTimer methods of a KeyedProcessFunction?
In my case they both do changes in the function’s state (processElement
initiates the state if not initialized and onTime clears the state) and I
Hi team,
I have one question, hoping to get some help.
Will BoundedOutOfOrderness have any impact on the KeyedProcessFunction? If so,
in what way it can impact KeyedProcessFunction?
Thanks!
Best,
Lijuan
iveStreams.keyBy()
("...Apply the partitioning on the input and feedback streams instead")
does not look right.
I tried that (I made a loop with a single stream of
Either) but it seems there is no way
of processing an IterativeStream with a KeyedProcessFunction, nor to feed
bac
I am trying to implement an iterative streaming job that processes the loop
> with a KeyedProcessFunction.
>
> I need a KeyedProcessFunction to use keyed state and to emit a side-output
> (that after further transformations becomes the feedback)
>
> Problem is Iterativ
Hi all,
I am trying to implement an iterative streaming job that processes the loop
with a KeyedProcessFunction.
I need a KeyedProcessFunction to use keyed state and to emit a side-output
(that after further transformations becomes the feedback)
Problem is IterativeStream.process() only accepts
Yes, restarting the app with a clean state does seem to fix the issue, but
I think I may have found a bug in Flink.
Here's how we can replicate it:
- Create a simple application with KeyedProcessFunction (with onTimer())
- Send a few records with the same key. In processElement(), regis
After fixing your negative timestamp bug, can the timer be triggered?
> On 23 Mar 2022, at 2:39 AM, Binil Benjamin wrote:
>
> Here are some more findings as I was debugging this. I peeked into the
> snapshot to see the current values in "_timer_state/processing_user-timers"
> and here is ho
Here are some more findings as I was debugging this. I peeked into the
snapshot to see the current values in
"_timer_state/processing_user-timers" and here is how they look:
Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FF
Hi,
Parallelism is currently set to 9 and it appears to be occurring for all
subtasks.
We did put logs to see the various timestamps. The following logs are from
the last 5 days.
- logs from processElement() - logged immediately after timer registration:
"message": "FunctionName=WfProcessFun
ect:Re: onTimer() of a KeyedProcessFunction stops getting triggered after a
while
Hi,
Unfortunately, I cannot share the entire code, but the class roughly looks like
this:
public class WfProcessFunction extends KeyedProcessFunction, Map, Map> {
@Override
public void processEleme
Hi,
Unfortunately, I cannot share the entire code, but the class roughly looks
like this:
public class WfProcessFunction extends KeyedProcessFunction, Map, Map> {
@Override
public void processElement(Map inputRecord,
Context context, Collector> collector) throws
Exc
Hi, can you share your code so we can check whether it is written correctly.
> On 18 Mar 2022, at 7:54 AM, Binil Benjamin wrote:
>
> Hi,
>
> We have a class that extends KeyedProcessFunction and overrides onTimer()
> method. During processElement(), we register a ti
Hi,
We have a class that extends KeyedProcessFunction and overrides onTimer()
method. During processElement(), we register a timer callback using
context.timerService().registerProcessingTimeTimer(). For
a while, we see that the onTimer() method is getting called back and
everything works as
= aggrRecord.get(); // Always get null value.
Thanks,
Suchithra
From: JING ZHANG
Sent: Wednesday, June 9, 2021 2:20 PM
To: V N, Suchithra (Nokia - IN/Bangalore)
Cc: user@flink.apache.org; Jash, Shaswata (Nokia - IN/Bangalore)
Subject: Re: Issue with onTimer method of KeyedProcessFunction
Hi Suchithra
= aggrRecord.get(); // Always get null value.
Thanks,
Suchithra
From: JING ZHANG
Sent: Wednesday, June 9, 2021 2:20 PM
To: V N, Suchithra (Nokia - IN/Bangalore)
Cc: user@flink.apache.org; Jash, Shaswata (Nokia - IN/Bangalore)
Subject: Re: Issue with onTimer method of KeyedProcessFunction
Hi Suchithra
After upgrading to 1.12.3 version, the onTimer method of
> KeyedProcessFunction is not behaving correctly, the value of ReducingState
> and ValueState always return null.
>
>
>
> Could you please help in debugging the issue.
>
>
>
> Thanks,
>
> Suchithra
>
>
>
Hello,
We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3
version. After upgrading to 1.12.3 version, the onTimer method of
KeyedProcessFunction is not behaving correctly, the value of ReducingState and
ValueState always return null.
Could you please help in debugging
n? Would this
> not be a problem?
>
> Thanks, once again.
>
> Dawid Wysakowicz escreveu no dia segunda,
> 10/05/2021 à(s) 09:13:
>
>> Hey Miguel,
>>
>> I think you could take a look at the CepOperator which does pretty much
>> what you are describing
direct answers for your questions. If you use
> KeyedProcessFunction it is always scoped to a single Key. There is no way
> to process events from other keys. If you want to have more control over
> state and e.g. use PriorityQueue which would be snapshotted on checkpoint
> you could lo
Hey Miguel,
I think you could take a look at the CepOperator which does pretty much
what you are describing.
As for more direct answers for your questions. If you use
KeyedProcessFunction it is always scoped to a single Key. There is no
way to process events from other keys. If you want to have
Hi Timo,
Thanks for your answer. I think I wasn't clear enough in my initial
message, so let me give more details.
The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
user-id) and then fed into a KeyedProcessFunction. I want to process all
events for a given use
, Miguel Araújo wrote:
Hi everyone,
I have a KeyedProcessFunction whose events I would like to process in
event-time order.
My initial idea was to use a Map keyed by timestamp and, when a new
event arrives, iterate over the Map to process events older than the
current watermark.
The issue is that
Hi everyone,
I have a KeyedProcessFunction whose events I would like to process in
event-time order.
My initial idea was to use a Map keyed by timestamp and, when a new event
arrives, iterate over the Map to process events older than the current
watermark.
The issue is that I obviously can'
I missed in documentation:
A KeyedProcessFunction is always a RichFunction. Therefore, access to the
RuntimeContext is always available and setup and teardown methods can be
implemented. See
RichFunction.open(org.apache.flink.configuration.Configuration) and
RichFunction.close().
https
Hello,
I'm learning State Processor API:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
There is example in this page with StatefulFunctionWithTime extends
KeyedProcessFunction. And here we can see method open() we need implement to
initialize state
>>> watermarks each time. Two cycles are shown below.
>>>
>>> I managed to figure out the root cause is that Flink stream
>>> execution environment has a default parallelism as 8.*I
didn't
>>>
got some progress on it.
> >>>
> >>> I have sent another email with the title 'BoundedOutOfOrderness
> >>> Watermark Generator is NOT making the event time to advance'
> >>> using another email of mine, fuyao...
context correctly. One more thing is that this behavior is not
reflected in the Flink Cluster web UI interface. I can see the
watermark is advancing, but it is not in reality. *That's
causing the inconsistency problem I mentioned in the other email
I mentioned a
rallelism as 8.*I didn't
> notice in the doc, could the Community add this explicitly into
> the official doc to avoid some confusion? Thanks.*
>
> From my understanding, the watermark advances based on the
> lowest watermark among the 8, so I can
an see the
watermark is advancing, but it is not in reality. *That's
causing the inconsistency problem I mentioned in the other
email
I mentioned above. Will this be considered as a bug in the UI?*
My current question is, since I have full outer join operation
n the Flink Cluster web UI interface. I can see the
watermark is advancing, but it is not in reality. *That's
causing the inconsistency problem I mentioned in the other email
I mentioned above. Will this be considered as a bug in the UI?*
My current question i
ior is not
reflected in the Flink Cluster web UI interface. I can see the
watermark is advancing, but it is not in reality. *That's
causing the inconsistency problem I mentioned in the other email
I mentioned above. Will this be considered as a bug in the UI?*
ution environment to be of parallelism 1, it will reflect
>> the watermark in the context correctly. One more thing is that this
>> behavior is not reflected in the Flink Cluster web UI interface. I can see
>> the watermark is advancing, but it is not in reality. *That's causing
>
see
> the watermark is advancing, but it is not in reality. *That's causing the
> inconsistency problem I mentioned in the other email I mentioned above.
> Will this be considered as a bug in the UI?*
>
> My current question is, since I have full outer join operation before
lem I mentioned in the other email I mentioned above.
Will this be considered as a bug in the UI?*
My current question is, since I have full outer join operation before the
KeyedProcessFunction here. How can I let the bound of orderness watermark /
punctuated watermark strategy work if the parallelism
in the
>> pipeline for 10 seconds.
>>
>> In the first stage, I first consume three Kafka streams and transform it
>> to Flink Datastream using a deserialization schema containing some type and
>> date format transformation, and then I register these data streams as Table
&
n schema containing some type and
> date format transformation, and then I register these data streams as Table
> and do a full outer join one by one using Table API. I also add query
> configuration for this to avoid excessive state. The primary key is also
> the join key.
>
> In t
m the joined table to a retracted stream and
put it into KeyedProcessFunction to generate the business object if the
business object's primary key is inactive for 10 second.
Is this way of handling the data the suggested approach? (I understand I
can directly consume kafka data in Table API.
Trenikhun
Cc: Dawid Wysakowicz ; Flink User Mail List
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
The new backend would be for unit tests (instead of a RocksDB mock). It's kind
of the mock for out-of-core behavior that you initially requested.
To use rocksDB in an IT
---
> *From:* Arvid Heise
> *Sent:* Monday, September 14, 2020 4:26:47 AM
> *To:* Dawid Wysakowicz
> *Cc:* Alexey Trenikhun ; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: Unit Test for KeyedProcessFunction with out-of-core state
>
&
KeyedProcessFunction with out-of-core state
Hi Alexey,
Definition of test levels are always a bit blurry when writing tests for a data
processing framework, but I'm convinced that in your case, you should rather
think in terms of integration tests than unit tests:
* Unit test should really
gt; Thanks,
> Alexey
>
> --
> *From:* Tzu-Li (Gordon) Tai
> *Sent:* Friday, September 4, 2020 12:35:48 AM
> *To:* Alexey Trenikhun
> *Cc:* Flink User Mail List
> *Subject:* Re: Unit Test for KeyedProcessFunction with out-of-core state
>
>
--
> *From:* Tzu-Li (Gordon) Tai
> *Sent:* Friday, September 4, 2020 12:35:48 AM
> *To:* Alexey Trenikhun
> *Cc:* Flink User Mail List
> *Subject:* Re: Unit Test for KeyedProcessFunction with out-of-core state
>
> Hi Alexey,
>
> Is there a specific reason why you
, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun
Cc: Flink User Mail List
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
Hi Alexey,
Is there a specific reason why you want to test against RocksDB?
Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness
/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun wrote:
> Hello,
> I want to unit test KeyedProcessFunction which uses with out-of-core state
> (like rocksdb).
> Do
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state
(like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?
Thanks,
Alexey
gn the event time with an AscendingTimestampExtractor
>
> I noticed when debugging that in the KeyedProcessFunction that
> after my highest known event time of: 2020-06-23T00:46:30.000Z
>
> the processElement method had a watermark with an impossible date of:
> -292275055-05-16T1
My source is a Kafka topic.
I am using Event Time.
I assign the event time with an AscendingTimestampExtractor
I noticed when debugging that in the KeyedProcessFunction that
after my highest known event time of: 2020-06-23T00:46:30.000Z
the processElement method had a watermark with an
Hi
+1. Because there is no need to generate an instance for each key, flink just
maintain the key collection in one instance. Imagine what would happen if the
number of keys were unlimited.
Best,
Yichao Yang
-- Original --
From: "Tzu-Li (Gordon) Tai"http:/
Hi,
Records with the same key will be processed by the same partition.
Note there isn't an instance of a keyed process function for each key.
There is a single instance per partition, and all keys that are distributed
to the same partition will get processed by the same keyed process function
inst
Hi All,
I have a keyed data stream and calling a keyedProcessFunction after keyBy
operation on datastream. Till now my understanding was, "For all different n-
elements in keyed stream if their keys are same, same instance of
keyedProcessFunction is called and for another elements
is constantly emitting the data and
>> bumping your timers? Keep in mind that the code that you are basing on has
>> the following characteristic:
>>
>> > In the following example a KeyedProcessFunction maintains counts per
>> key, and emits a key/count pair whe
*
On Tue, Jun 18, 2019 at 9:22 AM Piotr Nowojski wrote:
> Hi,
>
> Isn’t your problem that the source is constantly emitting the data and
> bumping your timers? Keep in mind that the code that you are basing on has
> the following characteristic:
>
> > In the following
Hi,
Isn’t your problem that the source is constantly emitting the data and bumping
your timers? Keep in mind that the code that you are basing on has the
following characteristic:
> In the following example a KeyedProcessFunction maintains counts per key, and
> emits a key/count pair wh
Hi,
I used this example of KeyedProcessFunction from the FLink website [1] and
I have implemented my own KeyedProcessFunction to process some
approximation counting [2]. This worked very well. Then I switched the data
source to consume strings from Twitter [3]. The data source is consuming
the
Thanks Bowen.
On Thu, Jul 19, 2018 at 4:45 PM, Bowen Li wrote:
> Hi Anna,
>
> KeyedProcessFunction is only available starting from Flink 1.5. The doc is
> here
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedproc
Hi Anna,
KeyedProcessFunction is only available starting from Flink 1.5. The doc is
here
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction>.
It extends ProcessFunction and shares the same functionalities except
givin
Hello all,
I am using Flink 1.4 because thats the version provided by the latest AWS
EMR.
Is KeyedProcessFunction available in Flink 1.4?
Also please share any links to good examples on using KeyedProcessFunction
.
Thanks
58 matches
Mail list logo