Hi Kostas,

In my use case I’m keeping track of the state of URLs during a web crawl.

This represents both current state (“URL X should be crawled at time Y, and has 
an estimated value of Z), and is the source of URLs to be fed into the crawl 
infrastructure - it’s a floor wax and a dessert topping.

Which is why it’s a process function, so that I can query this “crawlDB” to get 
URLs to emit to the fetch queue, independent of when/if new URLs are flowing in 
from some external source.

And yes, I could use an external, queryable system to to handle this (e.g. 
Elasticsearch), but at a scale of billions of URLs having something custom is 
of significant value in terms of performance and resource costs.

There are things I could do to better leverage Flink’s state management, so I 
have to do less in this custom DB (e.g. archiving low-scoring URLs comes to 
mind).

But after a few whiteboard sessions, it still seems like I’m going to have to 
add checkpointing/snapshotting support to my custom crawlDB.

Thanks,

— Ken


> On Apr 28, 2017, at 1:28am, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Ken,
> 
> So you have a queue where elements are sorted by timestamp and score, and 
> when the time (event time I suppose) passes 
> that of the timestamp of an element, you want to fetch the element and:
>  if the score is too low you archive it 
>  if the score is OK you emit it.
> 
> If I get it right, then if your stream is keyed you have a queue and an 
> “archive” state per key, 
> if not, you have a global queue for all elements, which can be seen as a 
> keyed stream on a dummy key, right?
> By the way, timers in Flink have to be associated with a key, so I suppose 
> that if you are using timers you are in the first case (keyed stream).
> 
> In this case, why do you need access to the state of all the keys?
> 
> Also it may be worth having a look at the CEP operator in the Flink codebase.
> There you also have a queue per key, where events are sorted by timestamp, 
> and at each watermark, 
> elements with timestamps smaller than the watermark are processed.
> 
> Hope this helps,
> Kostas
> 
>> On Apr 28, 2017, at 4:08 AM, Ken Krugler <kkrugler_li...@transpac.com 
>> <mailto:kkrugler_li...@transpac.com>> wrote:
>> 
>> Hi Kostas,
>> 
>> Thanks for responding. Details in-line below.
>> 
>>> On Apr 27, 2017, at 1:19am, Kostas Kloudas <k.klou...@data-artisans.com 
>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Unfortunately, iterating over all keys is not currently supported.
>>> 
>>> Do you have your own custom operator (because you mention “from within the 
>>> operator…”) or
>>> you have a process function (because you mention the “onTimer” method)?
>> 
>> Currently it’s a process function, but I might be able to just use a regular 
>> operator.
>> 
>>> Also, could you describe your use case a bit more?  You have a periodic 
>>> timer per key and when
>>> a timer for a given key fires you want to have access to the state of all 
>>> the keys?
>> 
>> The timer bit is because I’m filling an async queue, and thus need to 
>> trigger emitting tuples to the operator’s output stream independent of 
>> inbound tuples.
>> The main problems I’m trying to solve (without requiring a separate scalable 
>> DB infrastructure) are:
>> 
>>  - entries have an associated “earliest processing time”. I don’t want to 
>> send these through the system until that time trigger has passed.
>>  - entries have an associated “score”. I want to favor processing high 
>> scoring entries over low scoring entries.
>>  - if an entry’s score is too low, I want to archive it, versus constantly 
>> re-evaluate it using the above two factors.
>> 
>> I’ve got my own custom DB that is working for the above, and scales to 
>> target sizes of 1B+ entries per server by using a mixture of RAM and disk.
> 
>> But having to checkpoint it isn’t trivial.
>> 
>> So I thought that if there was a way to (occasionally) iterate over the keys 
>> in the state backend, I could get what I needed with the minimum effort.
>> 
>> But sounds like that’s not possible currently.
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> 
>> 
>>>> On Apr 27, 2017, at 3:02 AM, Ken Krugler <kkrugler_li...@transpac.com 
>>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>> 
>>>> Is there a way to iterate over all of the key/value entries in the state 
>>>> backend, from within the operator that’s making use of the same?
>>>> 
>>>> E.g. I’ve got a ReducingState, and on a timed interval (inside of the 
>>>> onTimer method) I need to iterate over all KV state and emit the N “best” 
>>>> entries.
>>>> 
>>>> What’s the recommended approach?
>>>> 
>>>> Thanks,
>>>> 
>>>> — Ken
>>>> 
>>> 
>> 
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Reply via email to