Yes, pretty clear. I guess semantically it's still a co-group, but
implemented slightly differently.

Thanks!

--
Gianmarco

On 9 September 2015 at 15:37, Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hey Gianmarco,
>
> So the implementation looks something different:
>
> The update stream is received by a stateful KVStoreOperator which stores
> the K-V pairs as their partitioned state.
>
> The query for the 2 cities is assigned an ID yes, and is split to the 2
> cities, and each of these are  sent to the same KVStoreOperator as the
> update stream. The output is the value for each key practically (qid,
> city1, temp1) which is retreived from the operator state , and this output
> is merged in a next operator to form the KV[] output on which the user can
> execute the difference if he wants.
>
> So actually no co-group is happening although semantically it might be
> similar. Instead I use stateful operators to be much more efficient.
>
> Does this answer you question?
>
> Gyula
>
> Gianmarco De Francisci Morales <g...@apache.org> ezt írta (időpont: 2015.
> szept. 9., Sze, 14:29):
>
> > Just a silly question.
> > For the example you described, in a data flow model, you would do
> something
> > like this:
> >
> > Have query ids added to the city pairs (qid, city1, city2),
> > then split the query stream on the two cities and co-group it with the
> > updates stream ((city1, qid) , (city, temp)), same for city2,
> > then emit (qid, city1, temp1), (qid, city2, temp2) from the two
> co-groups,
> > group on the qid, and apply a difference operator to get the final
> answer.
> >
> > Is your  idea to implement a way to generalize this logic, or it would
> use
> > remote read/write to a KV-store?
> >
> > --
> > Gianmarco
> >
> > On 8 September 2015 at 16:27, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > That's a very nice application of the Stream API and partitioned state.
> > :D
> > >
> > > I think we should run some tests on a cluster  based on this to see
> what
> > > kind of throughput the partitioned state system can handle and also how
> > it
> > > behaves with larger numbers of keys. The KVStore is just an interface
> and
> > > the really heavy lifting is done by the state system so this would be a
> > > good test for it.
> > >
> > >
> > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <gyula.f...@gmail.com> wrote:
> > >
> > > > @Stephan:
> > > >
> > > > Technically speaking this is really just a partitioned key-value
> state
> > > and
> > > > a fancy operator executing special operations on this state.
> > > >
> > > > From the user's perspective though this is something hard to
> implement.
> > > If
> > > > you want to share state between two stream for instance this way
> > (getting
> > > > updates from one stream and enriching the other one) you would
> probably
> > > use
> > > > a connected datastream and custom implement the Key-value store
> logic.
> > > But
> > > > once you have one(or more) update stream and many get streams this
> > > > implementation will not work. So either the user end up replicating
> the
> > > > whole state in multiple connected operators, or custom implement some
> > > > inefficient wrapper class to take care of all the put/get operations.
> > > >
> > > > The Idea behind this is to give a very simple abstraction for this
> type
> > > of
> > > > processing that uses the flink runtime efficiently instead of relying
> > on
> > > > custom implementations.
> > > >
> > > > Let me give you a stupid example:
> > > >
> > > > You receive Temperature data in the form of (city, temperature), and
> > you
> > > > are computing a rolling avg for each city.
> > > > Now you have 2 other incoming streams: first is a stream of some
> other
> > > info
> > > > about the city let's say population (city, population) and you want
> to
> > > > combine it with the last known avg temperature to produce (city,
> temp,
> > > pop)
> > > > triplets. The second stream is a pair of cities (city,city) and you
> are
> > > > interested in the difference of the temperature.
> > > >
> > > > For enriching the (city, pop) to (city,temp,pop) you would probably
> > use a
> > > > CoFlatMap and store the last known rolling avg as state. For
> computing
> > > the
> > > > (city,city) temperature difference it is a little more difficult, as
> > you
> > > > need to get the temperature for both cities then combine in a second
> > > > operator. If you don't want to replicate your state, you have to
> > combine
> > > > these two problems to a common wrapper type and execute them on a
> same
> > > > operator which will keep the avg state.
> > > >
> > > > With the KVStore abstraction this is very simple:
> > > > you create a KVStore<City, Temp>
> > > > For enriching you use kvStore.getWithKeySelector() which will give
> you
> > > > ((cit,pop), temp) pairs and you are done. For computing the
> difference,
> > > you
> > > > can use kvStore.multiget(...) and get for the 2 cities at the same
> > type.
> > > > The kv store will abstract away the getting of the 2 keys separately
> > and
> > > > merging them so it will return [(city1, t1), (city2,t2)].
> > > >
> > > > This might be slightly artificial example but I think it makes the
> > point.
> > > > Implementing these jobs efficiently is not trivial for the users but
> I
> > > > think it is a very common problem.
> > > >
> > > > Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. szept. 8.,
> K,
> > > > 14:53):
> > > >
> > > > > @Gyula
> > > > >
> > > > > Can you explain a bit what this KeyValue store would do more then
> the
> > > > > partitioned key/value state?
> > > > >
> > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <gga...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > As for use cases, in my old job at Ericsson we were building a
> > > > > > streaming system that was processing data from telephone
> networks,
> > > and
> > > > > > it was using key-value stores a LOT. For example, keeping track
> of
> > > > > > various state info of the users (which cell are they currently
> > > > > > connected to, what bearers do they have, ...); mapping from IDs
> of
> > > > > > users in one subsystem of the telephone network to the IDs of the
> > > same
> > > > > > users in an other subsystem; mapping from IDs of phone calls to
> > lists
> > > > > > of IDs of participating users; etc.
> > > > > > So I imagine they would like this a lot. (At least, if they were
> > > > > > considering moving to Flink :))
> > > > > >
> > > > > > Best,
> > > > > > Gabor
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <gyf...@apache.org>:
> > > > > > > Hey All,
> > > > > > >
> > > > > > > The last couple of days I have been playing around with the
> idea
> > of
> > > > > > > building a streaming key-value store abstraction using stateful
> > > > > streaming
> > > > > > > operators that can be used within Flink Streaming programs
> > > > seamlessly.
> > > > > > >
> > > > > > > Operations executed on this KV store would be fault tolerant as
> > it
> > > > > > > integrates with the checkpointing mechanism, and if we add
> > > timestamps
> > > > > to
> > > > > > > each put/get/... operation we can use the watermarks to create
> > > fully
> > > > > > > deterministic results. This functionality is very useful for
> many
> > > > > > > applications, and is very hard to implement properly with some
> > > > > dedicates
> > > > > > kv
> > > > > > > store.
> > > > > > >
> > > > > > > The KVStore abstraction could look as follows:
> > > > > > >
> > > > > > > KVStore<K,V> store = new KVStore<>;
> > > > > > >
> > > > > > > Operations:
> > > > > > >
> > > > > > > store.put(DataStream<Tuple2<K,V>>)
> > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>>
> > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>>
> > > > > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]>
> > > > > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) ->
> > > > > > > DataStream<KV<X,V>[]>
> > > > > > >
> > > > > > > For the resulting streams I used a special KV abstraction which
> > > let's
> > > > > us
> > > > > > > return null values.
> > > > > > >
> > > > > > > The implementation uses a simple streaming operator for
> executing
> > > > most
> > > > > of
> > > > > > > the operations (for multi get there is an additional merge
> > > operator)
> > > > > with
> > > > > > > either local or partitioned states for storing the kev-value
> > pairs
> > > > (my
> > > > > > > current prototype uses local states). And it can either execute
> > > > > > operations
> > > > > > > eagerly (which would not provide deterministic results), or
> > buffer
> > > up
> > > > > > > operations and execute them in order upon watermarks.
> > > > > > >
> > > > > > > As for use cases you can probably come up with many I will save
> > > that
> > > > > for
> > > > > > > now :D
> > > > > > >
> > > > > > > I have a prototype implementation here that can execute the
> > > > operations
> > > > > > > described above (does not handle watermarks and time yet):
> > > > > > >
> > > > > > > https://github.com/gyfora/flink/tree/KVStore
> > > > > > > And also an example job:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java
> > > > > > >
> > > > > > > What do you think?
> > > > > > > If you like it I will work on writing tests and it still needs
> a
> > > lot
> > > > of
> > > > > > > tweaking and refactoring. This might be something we want to
> > > include
> > > > > with
> > > > > > > the standard streaming libraries at one point.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Gyula
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to