Thanks, that helped to see how we could implement this!
On Wed, Sep 6, 2017 at 12:01 PM, Timo Walther wrote:
> Hi Johannes,
>
> you can find the implementation for the state clean up here:
> https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flin
Hi Johannes,
you can find the implementation for the state clean up here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
and a example usage here:
https://github.com/apache/flin
Hi,
I'm actually not very familiar with the current Table API implementations but
Fabian or Timo (cc'ed) should know more. I suspect very much that this is
implemented like this, yes.
Best,
Aljoscha
> On 5. Sep 2017, at 21:14, Johannes Schulte wrote:
>
> Hi,
>
> one short question I had tha
Thanks a lot everyone. I have the user data ingested from kafka and it is
keyed by userid. There are around 80 parallel flatmap operator instances
after keyby and there are around few million users. The map state includes
userid as the key and some value. I guess I will try the approach that
Aljosc
Hi,
This is mostly correct, but you cannot register a timer in open() because we
don't have an active key there. Only in process() and onTimer() can you
register a timer.
In your case, I would suggest to somehow clamp the timestamp to the nearest 2
minute (or whatever) interval or to keep an e
Hi,
You can register a processing time timer inside the onTimer and the open
function to have a timer that run periodically.
Pseudo-code example:
|ValueState lastRuntime; void open() {
ctx.timerService().registerProcessingTimeTimer(current.timestamp +
6); } void onTimer() { // Run the p
Hi Navneeth,
Currently, I don't think there is any built-in functionality to trigger
onTimer periodically.
As for the second part of your question, do you mean that you want to query
on which key the fired timer was registered from? I think this also isn't
possible right now.
I'm looping in Aljos
How are you determining your data is stale? Also if you want to know the key,
why don't you store the key in your state as well?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/