Just a silly question. For the example you described, in a data flow model, you would do something like this:
Have query ids added to the city pairs (qid, city1, city2), then split the query stream on the two cities and co-group it with the updates stream ((city1, qid) , (city, temp)), same for city2, then emit (qid, city1, temp1), (qid, city2, temp2) from the two co-groups, group on the qid, and apply a difference operator to get the final answer. Is your idea to implement a way to generalize this logic, or it would use remote read/write to a KV-store? -- Gianmarco On 8 September 2015 at 16:27, Aljoscha Krettek <aljos...@apache.org> wrote: > That's a very nice application of the Stream API and partitioned state. :D > > I think we should run some tests on a cluster based on this to see what > kind of throughput the partitioned state system can handle and also how it > behaves with larger numbers of keys. The KVStore is just an interface and > the really heavy lifting is done by the state system so this would be a > good test for it. > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <gyula.f...@gmail.com> wrote: > > > @Stephan: > > > > Technically speaking this is really just a partitioned key-value state > and > > a fancy operator executing special operations on this state. > > > > From the user's perspective though this is something hard to implement. > If > > you want to share state between two stream for instance this way (getting > > updates from one stream and enriching the other one) you would probably > use > > a connected datastream and custom implement the Key-value store logic. > But > > once you have one(or more) update stream and many get streams this > > implementation will not work. So either the user end up replicating the > > whole state in multiple connected operators, or custom implement some > > inefficient wrapper class to take care of all the put/get operations. > > > > The Idea behind this is to give a very simple abstraction for this type > of > > processing that uses the flink runtime efficiently instead of relying on > > custom implementations. > > > > Let me give you a stupid example: > > > > You receive Temperature data in the form of (city, temperature), and you > > are computing a rolling avg for each city. > > Now you have 2 other incoming streams: first is a stream of some other > info > > about the city let's say population (city, population) and you want to > > combine it with the last known avg temperature to produce (city, temp, > pop) > > triplets. The second stream is a pair of cities (city,city) and you are > > interested in the difference of the temperature. > > > > For enriching the (city, pop) to (city,temp,pop) you would probably use a > > CoFlatMap and store the last known rolling avg as state. For computing > the > > (city,city) temperature difference it is a little more difficult, as you > > need to get the temperature for both cities then combine in a second > > operator. If you don't want to replicate your state, you have to combine > > these two problems to a common wrapper type and execute them on a same > > operator which will keep the avg state. > > > > With the KVStore abstraction this is very simple: > > you create a KVStore<City, Temp> > > For enriching you use kvStore.getWithKeySelector() which will give you > > ((cit,pop), temp) pairs and you are done. For computing the difference, > you > > can use kvStore.multiget(...) and get for the 2 cities at the same type. > > The kv store will abstract away the getting of the 2 keys separately and > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > This might be slightly artificial example but I think it makes the point. > > Implementing these jobs efficiently is not trivial for the users but I > > think it is a very common problem. > > > > Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. szept. 8., K, > > 14:53): > > > > > @Gyula > > > > > > Can you explain a bit what this KeyValue store would do more then the > > > partitioned key/value state? > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <gga...@gmail.com> wrote: > > > > > > > Hello, > > > > > > > > As for use cases, in my old job at Ericsson we were building a > > > > streaming system that was processing data from telephone networks, > and > > > > it was using key-value stores a LOT. For example, keeping track of > > > > various state info of the users (which cell are they currently > > > > connected to, what bearers do they have, ...); mapping from IDs of > > > > users in one subsystem of the telephone network to the IDs of the > same > > > > users in an other subsystem; mapping from IDs of phone calls to lists > > > > of IDs of participating users; etc. > > > > So I imagine they would like this a lot. (At least, if they were > > > > considering moving to Flink :)) > > > > > > > > Best, > > > > Gabor > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <gyf...@apache.org>: > > > > > Hey All, > > > > > > > > > > The last couple of days I have been playing around with the idea of > > > > > building a streaming key-value store abstraction using stateful > > > streaming > > > > > operators that can be used within Flink Streaming programs > > seamlessly. > > > > > > > > > > Operations executed on this KV store would be fault tolerant as it > > > > > integrates with the checkpointing mechanism, and if we add > timestamps > > > to > > > > > each put/get/... operation we can use the watermarks to create > fully > > > > > deterministic results. This functionality is very useful for many > > > > > applications, and is very hard to implement properly with some > > > dedicates > > > > kv > > > > > store. > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > Operations: > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > For the resulting streams I used a special KV abstraction which > let's > > > us > > > > > return null values. > > > > > > > > > > The implementation uses a simple streaming operator for executing > > most > > > of > > > > > the operations (for multi get there is an additional merge > operator) > > > with > > > > > either local or partitioned states for storing the kev-value pairs > > (my > > > > > current prototype uses local states). And it can either execute > > > > operations > > > > > eagerly (which would not provide deterministic results), or buffer > up > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > As for use cases you can probably come up with many I will save > that > > > for > > > > > now :D > > > > > > > > > > I have a prototype implementation here that can execute the > > operations > > > > > described above (does not handle watermarks and time yet): > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > What do you think? > > > > > If you like it I will work on writing tests and it still needs a > lot > > of > > > > > tweaking and refactoring. This might be something we want to > include > > > with > > > > > the standard streaming libraries at one point. > > > > > > > > > > Cheers, > > > > > Gyula > > > > > > > > > >