>Sorry for the late answer, I completely missed this email. (Thanks Robert
for pointing out).
No problem ;)

>Now that you have everything set up, in flatMap1 (for events) you would
query the state : state.value() and enrich your data
>in flatMap2 you would update the state: state.update(newState)

In this example, how are the states in the enrichments stream (enrichments
= DataStream<Tuple2<key, state>>) and the value state declared inside
YourCoFlatMap linked?

>in flatMap2 you would update the state: state.update(newState)
Wouldn't that only update the state declared in YourCoFlatMap, and not the
state in the enrichments stream?

Cheers,


2016-03-23 15:38 GMT+01:00 Gyula Fóra <gyula.f...@gmail.com>:

> Hi!
>
> Sorry for the late answer, I completely missed this email. (Thanks Robert
> for pointing out).
>
> You won't be able to use that project as it was dependent on an earlier
> snapshot version that still had completely different state semantics.
> I don't think it is realistic that I will re-implment this any time soon,
> but I think you can easily do what you want in the following way:
>
> Let's say you have 2 streams, the first contains the enrichment data per
> key let's say enrichments = DataStream<Tuple2<key, state>> .
> The second stream is the event stream that you want to enrich: events =
> DataStream<Tuple2<key, event>>
>
> To apply the enrichments the easiest is to use a CoFlatMap with a
> partitioned value state inside:
>
> events.connect(enrichments).keyBy(0,0).flatMap(new YourCoFlatMap())
>
> In this case if you declare a value state inside YourCoFlatMap it will be
> kept per key. For example in the open method:
> state = getRuntimeContext().getState(new ValueStateDescriptor("stateName",
> type, defaultValue)).
>
> Now that you have everything set up, in flatMap1 (for events) you would
> query the state : state.value() and enrich your data
> in flatMap2 you would update the state: state.update(newState)
>
> Does this make sense to you? Or is the use case completely different?
>
> Cheers,
> Gyula
>
> Nam-Luc Tran <namluc.t...@euranova.eu> ezt írta (időpont: 2016. márc. 18.,
> P, 18:25):
>
> > Hi Gyula,
> >
> > I'm currently looking after ways to enrich streams with external data.
> Have
> > you got any update on the topic in general or on StreamKV?
> >
> > I've checked out the code but it won't build, mainly because
> > StateCheckpointer has been removed since [FLINK-2808]. Any hint on a
> quick
> > replacement, before I dive in deeper?
> >
> > Cheers,
> >
> > 2015-09-15 20:29 GMT+02:00 Stephan Ewen <se...@apache.org>:
> >
> > > I think that is actually a cool way to kick of an addition to the
> system.
> > > Gives you a lot of flexibility and releasing and testing...
> > >
> > > It helps, though, to upload maven artifacts for it!
> > >
> > > On Tue, Sep 15, 2015 at 7:18 PM, Gyula Fóra <gyf...@apache.org> wrote:
> > >
> > > > Hey All,
> > > >
> > > > We decided to make this a standalone library until it is stable
> enough
> > > and
> > > > then we can decide whether we want to keep it like that or include in
> > the
> > > > project:
> > > >
> > > > https://github.com/gyfora/StreamKV
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > Gianmarco De Francisci Morales <g...@apache.org> ezt írta (időpont:
> > > 2015.
> > > > szept. 9., Sze, 20:25):
> > > >
> > > > > Yes, pretty clear. I guess semantically it's still a co-group, but
> > > > > implemented slightly differently.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > --
> > > > > Gianmarco
> > > > >
> > > > > On 9 September 2015 at 15:37, Gyula Fóra <gyula.f...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Gianmarco,
> > > > > >
> > > > > > So the implementation looks something different:
> > > > > >
> > > > > > The update stream is received by a stateful KVStoreOperator which
> > > > stores
> > > > > > the K-V pairs as their partitioned state.
> > > > > >
> > > > > > The query for the 2 cities is assigned an ID yes, and is split to
> > > the 2
> > > > > > cities, and each of these are  sent to the same KVStoreOperator
> as
> > > the
> > > > > > update stream. The output is the value for each key practically
> > (qid,
> > > > > > city1, temp1) which is retreived from the operator state , and
> this
> > > > > output
> > > > > > is merged in a next operator to form the KV[] output on which the
> > > user
> > > > > can
> > > > > > execute the difference if he wants.
> > > > > >
> > > > > > So actually no co-group is happening although semantically it
> might
> > > be
> > > > > > similar. Instead I use stateful operators to be much more
> > efficient.
> > > > > >
> > > > > > Does this answer you question?
> > > > > >
> > > > > > Gyula
> > > > > >
> > > > > > Gianmarco De Francisci Morales <g...@apache.org> ezt írta
> > (időpont:
> > > > > 2015.
> > > > > > szept. 9., Sze, 14:29):
> > > > > >
> > > > > > > Just a silly question.
> > > > > > > For the example you described, in a data flow model, you would
> do
> > > > > > something
> > > > > > > like this:
> > > > > > >
> > > > > > > Have query ids added to the city pairs (qid, city1, city2),
> > > > > > > then split the query stream on the two cities and co-group it
> > with
> > > > the
> > > > > > > updates stream ((city1, qid) , (city, temp)), same for city2,
> > > > > > > then emit (qid, city1, temp1), (qid, city2, temp2) from the two
> > > > > > co-groups,
> > > > > > > group on the qid, and apply a difference operator to get the
> > final
> > > > > > answer.
> > > > > > >
> > > > > > > Is your  idea to implement a way to generalize this logic, or
> it
> > > > would
> > > > > > use
> > > > > > > remote read/write to a KV-store?
> > > > > > >
> > > > > > > --
> > > > > > > Gianmarco
> > > > > > >
> > > > > > > On 8 September 2015 at 16:27, Aljoscha Krettek <
> > > aljos...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > That's a very nice application of the Stream API and
> > partitioned
> > > > > state.
> > > > > > > :D
> > > > > > > >
> > > > > > > > I think we should run some tests on a cluster  based on this
> to
> > > see
> > > > > > what
> > > > > > > > kind of throughput the partitioned state system can handle
> and
> > > also
> > > > > how
> > > > > > > it
> > > > > > > > behaves with larger numbers of keys. The KVStore is just an
> > > > interface
> > > > > > and
> > > > > > > > the really heavy lifting is done by the state system so this
> > > would
> > > > > be a
> > > > > > > > good test for it.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <gyula.f...@gmail.com
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > @Stephan:
> > > > > > > > >
> > > > > > > > > Technically speaking this is really just a partitioned
> > > key-value
> > > > > > state
> > > > > > > > and
> > > > > > > > > a fancy operator executing special operations on this
> state.
> > > > > > > > >
> > > > > > > > > From the user's perspective though this is something hard
> to
> > > > > > implement.
> > > > > > > > If
> > > > > > > > > you want to share state between two stream for instance
> this
> > > way
> > > > > > > (getting
> > > > > > > > > updates from one stream and enriching the other one) you
> > would
> > > > > > probably
> > > > > > > > use
> > > > > > > > > a connected datastream and custom implement the Key-value
> > store
> > > > > > logic.
> > > > > > > > But
> > > > > > > > > once you have one(or more) update stream and many get
> streams
> > > > this
> > > > > > > > > implementation will not work. So either the user end up
> > > > replicating
> > > > > > the
> > > > > > > > > whole state in multiple connected operators, or custom
> > > implement
> > > > > some
> > > > > > > > > inefficient wrapper class to take care of all the put/get
> > > > > operations.
> > > > > > > > >
> > > > > > > > > The Idea behind this is to give a very simple abstraction
> for
> > > > this
> > > > > > type
> > > > > > > > of
> > > > > > > > > processing that uses the flink runtime efficiently instead
> of
> > > > > relying
> > > > > > > on
> > > > > > > > > custom implementations.
> > > > > > > > >
> > > > > > > > > Let me give you a stupid example:
> > > > > > > > >
> > > > > > > > > You receive Temperature data in the form of (city,
> > > temperature),
> > > > > and
> > > > > > > you
> > > > > > > > > are computing a rolling avg for each city.
> > > > > > > > > Now you have 2 other incoming streams: first is a stream of
> > > some
> > > > > > other
> > > > > > > > info
> > > > > > > > > about the city let's say population (city, population) and
> > you
> > > > want
> > > > > > to
> > > > > > > > > combine it with the last known avg temperature to produce
> > > (city,
> > > > > > temp,
> > > > > > > > pop)
> > > > > > > > > triplets. The second stream is a pair of cities (city,city)
> > and
> > > > you
> > > > > > are
> > > > > > > > > interested in the difference of the temperature.
> > > > > > > > >
> > > > > > > > > For enriching the (city, pop) to (city,temp,pop) you would
> > > > probably
> > > > > > > use a
> > > > > > > > > CoFlatMap and store the last known rolling avg as state.
> For
> > > > > > computing
> > > > > > > > the
> > > > > > > > > (city,city) temperature difference it is a little more
> > > difficult,
> > > > > as
> > > > > > > you
> > > > > > > > > need to get the temperature for both cities then combine
> in a
> > > > > second
> > > > > > > > > operator. If you don't want to replicate your state, you
> have
> > > to
> > > > > > > combine
> > > > > > > > > these two problems to a common wrapper type and execute
> them
> > > on a
> > > > > > same
> > > > > > > > > operator which will keep the avg state.
> > > > > > > > >
> > > > > > > > > With the KVStore abstraction this is very simple:
> > > > > > > > > you create a KVStore<City, Temp>
> > > > > > > > > For enriching you use kvStore.getWithKeySelector() which
> will
> > > > give
> > > > > > you
> > > > > > > > > ((cit,pop), temp) pairs and you are done. For computing the
> > > > > > difference,
> > > > > > > > you
> > > > > > > > > can use kvStore.multiget(...) and get for the 2 cities at
> the
> > > > same
> > > > > > > type.
> > > > > > > > > The kv store will abstract away the getting of the 2 keys
> > > > > separately
> > > > > > > and
> > > > > > > > > merging them so it will return [(city1, t1), (city2,t2)].
> > > > > > > > >
> > > > > > > > > This might be slightly artificial example but I think it
> > makes
> > > > the
> > > > > > > point.
> > > > > > > > > Implementing these jobs efficiently is not trivial for the
> > > users
> > > > > but
> > > > > > I
> > > > > > > > > think it is a very common problem.
> > > > > > > > >
> > > > > > > > > Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015.
> > > szept.
> > > > > 8.,
> > > > > > K,
> > > > > > > > > 14:53):
> > > > > > > > >
> > > > > > > > > > @Gyula
> > > > > > > > > >
> > > > > > > > > > Can you explain a bit what this KeyValue store would do
> > more
> > > > then
> > > > > > the
> > > > > > > > > > partitioned key/value state?
> > > > > > > > > >
> > > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <
> > > gga...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hello,
> > > > > > > > > > >
> > > > > > > > > > > As for use cases, in my old job at Ericsson we were
> > > building
> > > > a
> > > > > > > > > > > streaming system that was processing data from
> telephone
> > > > > > networks,
> > > > > > > > and
> > > > > > > > > > > it was using key-value stores a LOT. For example,
> keeping
> > > > track
> > > > > > of
> > > > > > > > > > > various state info of the users (which cell are they
> > > > currently
> > > > > > > > > > > connected to, what bearers do they have, ...); mapping
> > from
> > > > IDs
> > > > > > of
> > > > > > > > > > > users in one subsystem of the telephone network to the
> > IDs
> > > of
> > > > > the
> > > > > > > > same
> > > > > > > > > > > users in an other subsystem; mapping from IDs of phone
> > > calls
> > > > to
> > > > > > > lists
> > > > > > > > > > > of IDs of participating users; etc.
> > > > > > > > > > > So I imagine they would like this a lot. (At least, if
> > they
> > > > > were
> > > > > > > > > > > considering moving to Flink :))
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Gabor
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <
> gyf...@apache.org
> > >:
> > > > > > > > > > > > Hey All,
> > > > > > > > > > > >
> > > > > > > > > > > > The last couple of days I have been playing around
> with
> > > the
> > > > > > idea
> > > > > > > of
> > > > > > > > > > > > building a streaming key-value store abstraction
> using
> > > > > stateful
> > > > > > > > > > streaming
> > > > > > > > > > > > operators that can be used within Flink Streaming
> > > programs
> > > > > > > > > seamlessly.
> > > > > > > > > > > >
> > > > > > > > > > > > Operations executed on this KV store would be fault
> > > > tolerant
> > > > > as
> > > > > > > it
> > > > > > > > > > > > integrates with the checkpointing mechanism, and if
> we
> > > add
> > > > > > > > timestamps
> > > > > > > > > > to
> > > > > > > > > > > > each put/get/... operation we can use the watermarks
> to
> > > > > create
> > > > > > > > fully
> > > > > > > > > > > > deterministic results. This functionality is very
> > useful
> > > > for
> > > > > > many
> > > > > > > > > > > > applications, and is very hard to implement properly
> > with
> > > > > some
> > > > > > > > > > dedicates
> > > > > > > > > > > kv
> > > > > > > > > > > > store.
> > > > > > > > > > > >
> > > > > > > > > > > > The KVStore abstraction could look as follows:
> > > > > > > > > > > >
> > > > > > > > > > > > KVStore<K,V> store = new KVStore<>;
> > > > > > > > > > > >
> > > > > > > > > > > > Operations:
> > > > > > > > > > > >
> > > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>)
> > > > > > > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>>
> > > > > > > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>>
> > > > > > > > > > > > store.multiGet(DataStream<K[]>) ->
> > DataStream<KV<K,V>[]>
> > > > > > > > > > > > store.getWithKeySelector(DataStream<X>,
> > KeySelector<X,K>)
> > > > ->
> > > > > > > > > > > > DataStream<KV<X,V>[]>
> > > > > > > > > > > >
> > > > > > > > > > > > For the resulting streams I used a special KV
> > abstraction
> > > > > which
> > > > > > > > let's
> > > > > > > > > > us
> > > > > > > > > > > > return null values.
> > > > > > > > > > > >
> > > > > > > > > > > > The implementation uses a simple streaming operator
> for
> > > > > > executing
> > > > > > > > > most
> > > > > > > > > > of
> > > > > > > > > > > > the operations (for multi get there is an additional
> > > merge
> > > > > > > > operator)
> > > > > > > > > > with
> > > > > > > > > > > > either local or partitioned states for storing the
> > > > kev-value
> > > > > > > pairs
> > > > > > > > > (my
> > > > > > > > > > > > current prototype uses local states). And it can
> either
> > > > > execute
> > > > > > > > > > > operations
> > > > > > > > > > > > eagerly (which would not provide deterministic
> > results),
> > > or
> > > > > > > buffer
> > > > > > > > up
> > > > > > > > > > > > operations and execute them in order upon watermarks.
> > > > > > > > > > > >
> > > > > > > > > > > > As for use cases you can probably come up with many I
> > > will
> > > > > save
> > > > > > > > that
> > > > > > > > > > for
> > > > > > > > > > > > now :D
> > > > > > > > > > > >
> > > > > > > > > > > > I have a prototype implementation here that can
> execute
> > > the
> > > > > > > > > operations
> > > > > > > > > > > > described above (does not handle watermarks and time
> > > yet):
> > > > > > > > > > > >
> > > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore
> > > > > > > > > > > > And also an example job:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java
> > > > > > > > > > > >
> > > > > > > > > > > > What do you think?
> > > > > > > > > > > > If you like it I will work on writing tests and it
> > still
> > > > > needs
> > > > > > a
> > > > > > > > lot
> > > > > > > > > of
> > > > > > > > > > > > tweaking and refactoring. This might be something we
> > want
> > > > to
> > > > > > > > include
> > > > > > > > > > with
> > > > > > > > > > > > the standard streaming libraries at one point.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > > Gyula
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> >
> > *Nam-Luc TRAN*
> >
> > R&D Manager
> >
> > EURA NOVA
> >
> > (M) +32 498 37 36 23
> >
> > *euranova.eu <http://euranova.eu>*
> >
>



-- 

*Nam-Luc TRAN*

R&D Manager

EURA NOVA

(M) +32 498 37 36 23

*euranova.eu <http://euranova.eu>*

Reply via email to