Hi Kostas, In my use case I’m keeping track of the state of URLs during a web crawl.
This represents both current state (“URL X should be crawled at time Y, and has an estimated value of Z), and is the source of URLs to be fed into the crawl infrastructure - it’s a floor wax and a dessert topping. Which is why it’s a process function, so that I can query this “crawlDB” to get URLs to emit to the fetch queue, independent of when/if new URLs are flowing in from some external source. And yes, I could use an external, queryable system to to handle this (e.g. Elasticsearch), but at a scale of billions of URLs having something custom is of significant value in terms of performance and resource costs. There are things I could do to better leverage Flink’s state management, so I have to do less in this custom DB (e.g. archiving low-scoring URLs comes to mind). But after a few whiteboard sessions, it still seems like I’m going to have to add checkpointing/snapshotting support to my custom crawlDB. Thanks, — Ken > On Apr 28, 2017, at 1:28am, Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > > Hi Ken, > > So you have a queue where elements are sorted by timestamp and score, and > when the time (event time I suppose) passes > that of the timestamp of an element, you want to fetch the element and: > if the score is too low you archive it > if the score is OK you emit it. > > If I get it right, then if your stream is keyed you have a queue and an > “archive” state per key, > if not, you have a global queue for all elements, which can be seen as a > keyed stream on a dummy key, right? > By the way, timers in Flink have to be associated with a key, so I suppose > that if you are using timers you are in the first case (keyed stream). > > In this case, why do you need access to the state of all the keys? > > Also it may be worth having a look at the CEP operator in the Flink codebase. > There you also have a queue per key, where events are sorted by timestamp, > and at each watermark, > elements with timestamps smaller than the watermark are processed. > > Hope this helps, > Kostas > >> On Apr 28, 2017, at 4:08 AM, Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>> wrote: >> >> Hi Kostas, >> >> Thanks for responding. Details in-line below. >> >>> On Apr 27, 2017, at 1:19am, Kostas Kloudas <k.klou...@data-artisans.com >>> <mailto:k.klou...@data-artisans.com>> wrote: >>> >>> Hi Ken, >>> >>> Unfortunately, iterating over all keys is not currently supported. >>> >>> Do you have your own custom operator (because you mention “from within the >>> operator…”) or >>> you have a process function (because you mention the “onTimer” method)? >> >> Currently it’s a process function, but I might be able to just use a regular >> operator. >> >>> Also, could you describe your use case a bit more? You have a periodic >>> timer per key and when >>> a timer for a given key fires you want to have access to the state of all >>> the keys? >> >> The timer bit is because I’m filling an async queue, and thus need to >> trigger emitting tuples to the operator’s output stream independent of >> inbound tuples. >> The main problems I’m trying to solve (without requiring a separate scalable >> DB infrastructure) are: >> >> - entries have an associated “earliest processing time”. I don’t want to >> send these through the system until that time trigger has passed. >> - entries have an associated “score”. I want to favor processing high >> scoring entries over low scoring entries. >> - if an entry’s score is too low, I want to archive it, versus constantly >> re-evaluate it using the above two factors. >> >> I’ve got my own custom DB that is working for the above, and scales to >> target sizes of 1B+ entries per server by using a mixture of RAM and disk. > >> But having to checkpoint it isn’t trivial. >> >> So I thought that if there was a way to (occasionally) iterate over the keys >> in the state backend, I could get what I needed with the minimum effort. >> >> But sounds like that’s not possible currently. >> >> Thanks, >> >> — Ken >> >> >> >>>> On Apr 27, 2017, at 3:02 AM, Ken Krugler <kkrugler_li...@transpac.com >>>> <mailto:kkrugler_li...@transpac.com>> wrote: >>>> >>>> Is there a way to iterate over all of the key/value entries in the state >>>> backend, from within the operator that’s making use of the same? >>>> >>>> E.g. I’ve got a ReducingState, and on a timed interval (inside of the >>>> onTimer method) I need to iterate over all KV state and emit the N “best” >>>> entries. >>>> >>>> What’s the recommended approach? >>>> >>>> Thanks, >>>> >>>> — Ken >>>> >>> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> custom big data solutions & training >> Hadoop, Cascading, Cassandra & Solr > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com custom big data solutions & training Hadoop, Cascading, Cassandra & Solr