>Sorry for the late answer, I completely missed this email. (Thanks Robert for pointing out). No problem ;)
>Now that you have everything set up, in flatMap1 (for events) you would query the state : state.value() and enrich your data >in flatMap2 you would update the state: state.update(newState) In this example, how are the states in the enrichments stream (enrichments = DataStream<Tuple2<key, state>>) and the value state declared inside YourCoFlatMap linked? >in flatMap2 you would update the state: state.update(newState) Wouldn't that only update the state declared in YourCoFlatMap, and not the state in the enrichments stream? Cheers, 2016-03-23 15:38 GMT+01:00 Gyula Fóra <gyula.f...@gmail.com>: > Hi! > > Sorry for the late answer, I completely missed this email. (Thanks Robert > for pointing out). > > You won't be able to use that project as it was dependent on an earlier > snapshot version that still had completely different state semantics. > I don't think it is realistic that I will re-implment this any time soon, > but I think you can easily do what you want in the following way: > > Let's say you have 2 streams, the first contains the enrichment data per > key let's say enrichments = DataStream<Tuple2<key, state>> . > The second stream is the event stream that you want to enrich: events = > DataStream<Tuple2<key, event>> > > To apply the enrichments the easiest is to use a CoFlatMap with a > partitioned value state inside: > > events.connect(enrichments).keyBy(0,0).flatMap(new YourCoFlatMap()) > > In this case if you declare a value state inside YourCoFlatMap it will be > kept per key. For example in the open method: > state = getRuntimeContext().getState(new ValueStateDescriptor("stateName", > type, defaultValue)). > > Now that you have everything set up, in flatMap1 (for events) you would > query the state : state.value() and enrich your data > in flatMap2 you would update the state: state.update(newState) > > Does this make sense to you? Or is the use case completely different? > > Cheers, > Gyula > > Nam-Luc Tran <namluc.t...@euranova.eu> ezt írta (időpont: 2016. márc. 18., > P, 18:25): > > > Hi Gyula, > > > > I'm currently looking after ways to enrich streams with external data. > Have > > you got any update on the topic in general or on StreamKV? > > > > I've checked out the code but it won't build, mainly because > > StateCheckpointer has been removed since [FLINK-2808]. Any hint on a > quick > > replacement, before I dive in deeper? > > > > Cheers, > > > > 2015-09-15 20:29 GMT+02:00 Stephan Ewen <se...@apache.org>: > > > > > I think that is actually a cool way to kick of an addition to the > system. > > > Gives you a lot of flexibility and releasing and testing... > > > > > > It helps, though, to upload maven artifacts for it! > > > > > > On Tue, Sep 15, 2015 at 7:18 PM, Gyula Fóra <gyf...@apache.org> wrote: > > > > > > > Hey All, > > > > > > > > We decided to make this a standalone library until it is stable > enough > > > and > > > > then we can decide whether we want to keep it like that or include in > > the > > > > project: > > > > > > > > https://github.com/gyfora/StreamKV > > > > > > > > Cheers, > > > > Gyula > > > > > > > > Gianmarco De Francisci Morales <g...@apache.org> ezt írta (időpont: > > > 2015. > > > > szept. 9., Sze, 20:25): > > > > > > > > > Yes, pretty clear. I guess semantically it's still a co-group, but > > > > > implemented slightly differently. > > > > > > > > > > Thanks! > > > > > > > > > > -- > > > > > Gianmarco > > > > > > > > > > On 9 September 2015 at 15:37, Gyula Fóra <gyula.f...@gmail.com> > > wrote: > > > > > > > > > > > Hey Gianmarco, > > > > > > > > > > > > So the implementation looks something different: > > > > > > > > > > > > The update stream is received by a stateful KVStoreOperator which > > > > stores > > > > > > the K-V pairs as their partitioned state. > > > > > > > > > > > > The query for the 2 cities is assigned an ID yes, and is split to > > > the 2 > > > > > > cities, and each of these are sent to the same KVStoreOperator > as > > > the > > > > > > update stream. The output is the value for each key practically > > (qid, > > > > > > city1, temp1) which is retreived from the operator state , and > this > > > > > output > > > > > > is merged in a next operator to form the KV[] output on which the > > > user > > > > > can > > > > > > execute the difference if he wants. > > > > > > > > > > > > So actually no co-group is happening although semantically it > might > > > be > > > > > > similar. Instead I use stateful operators to be much more > > efficient. > > > > > > > > > > > > Does this answer you question? > > > > > > > > > > > > Gyula > > > > > > > > > > > > Gianmarco De Francisci Morales <g...@apache.org> ezt írta > > (időpont: > > > > > 2015. > > > > > > szept. 9., Sze, 14:29): > > > > > > > > > > > > > Just a silly question. > > > > > > > For the example you described, in a data flow model, you would > do > > > > > > something > > > > > > > like this: > > > > > > > > > > > > > > Have query ids added to the city pairs (qid, city1, city2), > > > > > > > then split the query stream on the two cities and co-group it > > with > > > > the > > > > > > > updates stream ((city1, qid) , (city, temp)), same for city2, > > > > > > > then emit (qid, city1, temp1), (qid, city2, temp2) from the two > > > > > > co-groups, > > > > > > > group on the qid, and apply a difference operator to get the > > final > > > > > > answer. > > > > > > > > > > > > > > Is your idea to implement a way to generalize this logic, or > it > > > > would > > > > > > use > > > > > > > remote read/write to a KV-store? > > > > > > > > > > > > > > -- > > > > > > > Gianmarco > > > > > > > > > > > > > > On 8 September 2015 at 16:27, Aljoscha Krettek < > > > aljos...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > That's a very nice application of the Stream API and > > partitioned > > > > > state. > > > > > > > :D > > > > > > > > > > > > > > > > I think we should run some tests on a cluster based on this > to > > > see > > > > > > what > > > > > > > > kind of throughput the partitioned state system can handle > and > > > also > > > > > how > > > > > > > it > > > > > > > > behaves with larger numbers of keys. The KVStore is just an > > > > interface > > > > > > and > > > > > > > > the really heavy lifting is done by the state system so this > > > would > > > > > be a > > > > > > > > good test for it. > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <gyula.f...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > > > @Stephan: > > > > > > > > > > > > > > > > > > Technically speaking this is really just a partitioned > > > key-value > > > > > > state > > > > > > > > and > > > > > > > > > a fancy operator executing special operations on this > state. > > > > > > > > > > > > > > > > > > From the user's perspective though this is something hard > to > > > > > > implement. > > > > > > > > If > > > > > > > > > you want to share state between two stream for instance > this > > > way > > > > > > > (getting > > > > > > > > > updates from one stream and enriching the other one) you > > would > > > > > > probably > > > > > > > > use > > > > > > > > > a connected datastream and custom implement the Key-value > > store > > > > > > logic. > > > > > > > > But > > > > > > > > > once you have one(or more) update stream and many get > streams > > > > this > > > > > > > > > implementation will not work. So either the user end up > > > > replicating > > > > > > the > > > > > > > > > whole state in multiple connected operators, or custom > > > implement > > > > > some > > > > > > > > > inefficient wrapper class to take care of all the put/get > > > > > operations. > > > > > > > > > > > > > > > > > > The Idea behind this is to give a very simple abstraction > for > > > > this > > > > > > type > > > > > > > > of > > > > > > > > > processing that uses the flink runtime efficiently instead > of > > > > > relying > > > > > > > on > > > > > > > > > custom implementations. > > > > > > > > > > > > > > > > > > Let me give you a stupid example: > > > > > > > > > > > > > > > > > > You receive Temperature data in the form of (city, > > > temperature), > > > > > and > > > > > > > you > > > > > > > > > are computing a rolling avg for each city. > > > > > > > > > Now you have 2 other incoming streams: first is a stream of > > > some > > > > > > other > > > > > > > > info > > > > > > > > > about the city let's say population (city, population) and > > you > > > > want > > > > > > to > > > > > > > > > combine it with the last known avg temperature to produce > > > (city, > > > > > > temp, > > > > > > > > pop) > > > > > > > > > triplets. The second stream is a pair of cities (city,city) > > and > > > > you > > > > > > are > > > > > > > > > interested in the difference of the temperature. > > > > > > > > > > > > > > > > > > For enriching the (city, pop) to (city,temp,pop) you would > > > > probably > > > > > > > use a > > > > > > > > > CoFlatMap and store the last known rolling avg as state. > For > > > > > > computing > > > > > > > > the > > > > > > > > > (city,city) temperature difference it is a little more > > > difficult, > > > > > as > > > > > > > you > > > > > > > > > need to get the temperature for both cities then combine > in a > > > > > second > > > > > > > > > operator. If you don't want to replicate your state, you > have > > > to > > > > > > > combine > > > > > > > > > these two problems to a common wrapper type and execute > them > > > on a > > > > > > same > > > > > > > > > operator which will keep the avg state. > > > > > > > > > > > > > > > > > > With the KVStore abstraction this is very simple: > > > > > > > > > you create a KVStore<City, Temp> > > > > > > > > > For enriching you use kvStore.getWithKeySelector() which > will > > > > give > > > > > > you > > > > > > > > > ((cit,pop), temp) pairs and you are done. For computing the > > > > > > difference, > > > > > > > > you > > > > > > > > > can use kvStore.multiget(...) and get for the 2 cities at > the > > > > same > > > > > > > type. > > > > > > > > > The kv store will abstract away the getting of the 2 keys > > > > > separately > > > > > > > and > > > > > > > > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > > > > > > > > > > > > > > > This might be slightly artificial example but I think it > > makes > > > > the > > > > > > > point. > > > > > > > > > Implementing these jobs efficiently is not trivial for the > > > users > > > > > but > > > > > > I > > > > > > > > > think it is a very common problem. > > > > > > > > > > > > > > > > > > Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. > > > szept. > > > > > 8., > > > > > > K, > > > > > > > > > 14:53): > > > > > > > > > > > > > > > > > > > @Gyula > > > > > > > > > > > > > > > > > > > > Can you explain a bit what this KeyValue store would do > > more > > > > then > > > > > > the > > > > > > > > > > partitioned key/value state? > > > > > > > > > > > > > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay < > > > gga...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > > > > > > > As for use cases, in my old job at Ericsson we were > > > building > > > > a > > > > > > > > > > > streaming system that was processing data from > telephone > > > > > > networks, > > > > > > > > and > > > > > > > > > > > it was using key-value stores a LOT. For example, > keeping > > > > track > > > > > > of > > > > > > > > > > > various state info of the users (which cell are they > > > > currently > > > > > > > > > > > connected to, what bearers do they have, ...); mapping > > from > > > > IDs > > > > > > of > > > > > > > > > > > users in one subsystem of the telephone network to the > > IDs > > > of > > > > > the > > > > > > > > same > > > > > > > > > > > users in an other subsystem; mapping from IDs of phone > > > calls > > > > to > > > > > > > lists > > > > > > > > > > > of IDs of participating users; etc. > > > > > > > > > > > So I imagine they would like this a lot. (At least, if > > they > > > > > were > > > > > > > > > > > considering moving to Flink :)) > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Gabor > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra < > gyf...@apache.org > > >: > > > > > > > > > > > > Hey All, > > > > > > > > > > > > > > > > > > > > > > > > The last couple of days I have been playing around > with > > > the > > > > > > idea > > > > > > > of > > > > > > > > > > > > building a streaming key-value store abstraction > using > > > > > stateful > > > > > > > > > > streaming > > > > > > > > > > > > operators that can be used within Flink Streaming > > > programs > > > > > > > > > seamlessly. > > > > > > > > > > > > > > > > > > > > > > > > Operations executed on this KV store would be fault > > > > tolerant > > > > > as > > > > > > > it > > > > > > > > > > > > integrates with the checkpointing mechanism, and if > we > > > add > > > > > > > > timestamps > > > > > > > > > > to > > > > > > > > > > > > each put/get/... operation we can use the watermarks > to > > > > > create > > > > > > > > fully > > > > > > > > > > > > deterministic results. This functionality is very > > useful > > > > for > > > > > > many > > > > > > > > > > > > applications, and is very hard to implement properly > > with > > > > > some > > > > > > > > > > dedicates > > > > > > > > > > > kv > > > > > > > > > > > > store. > > > > > > > > > > > > > > > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > > > > > > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > > > > > > > > > > > > > > > Operations: > > > > > > > > > > > > > > > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > > > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > > > > store.multiGet(DataStream<K[]>) -> > > DataStream<KV<K,V>[]> > > > > > > > > > > > > store.getWithKeySelector(DataStream<X>, > > KeySelector<X,K>) > > > > -> > > > > > > > > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > > > > > > > > > > > > > > > For the resulting streams I used a special KV > > abstraction > > > > > which > > > > > > > > let's > > > > > > > > > > us > > > > > > > > > > > > return null values. > > > > > > > > > > > > > > > > > > > > > > > > The implementation uses a simple streaming operator > for > > > > > > executing > > > > > > > > > most > > > > > > > > > > of > > > > > > > > > > > > the operations (for multi get there is an additional > > > merge > > > > > > > > operator) > > > > > > > > > > with > > > > > > > > > > > > either local or partitioned states for storing the > > > > kev-value > > > > > > > pairs > > > > > > > > > (my > > > > > > > > > > > > current prototype uses local states). And it can > either > > > > > execute > > > > > > > > > > > operations > > > > > > > > > > > > eagerly (which would not provide deterministic > > results), > > > or > > > > > > > buffer > > > > > > > > up > > > > > > > > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > > > > > > > > > > > > > > > As for use cases you can probably come up with many I > > > will > > > > > save > > > > > > > > that > > > > > > > > > > for > > > > > > > > > > > > now :D > > > > > > > > > > > > > > > > > > > > > > > > I have a prototype implementation here that can > execute > > > the > > > > > > > > > operations > > > > > > > > > > > > described above (does not handle watermarks and time > > > yet): > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > > > > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > If you like it I will work on writing tests and it > > still > > > > > needs > > > > > > a > > > > > > > > lot > > > > > > > > > of > > > > > > > > > > > > tweaking and refactoring. This might be something we > > want > > > > to > > > > > > > > include > > > > > > > > > > with > > > > > > > > > > > > the standard streaming libraries at one point. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > *Nam-Luc TRAN* > > > > R&D Manager > > > > EURA NOVA > > > > (M) +32 498 37 36 23 > > > > *euranova.eu <http://euranova.eu>* > > > -- *Nam-Luc TRAN* R&D Manager EURA NOVA (M) +32 498 37 36 23 *euranova.eu <http://euranova.eu>*