Hi Flavio,
While you wait on an update from Kostas, wanted to understand the use-case 
better and share my thoughts-


1)       Why is current batch mode expensive? Where are you persisting the data 
after updates? Way I see it by moving to Flink, you get to use RocksDB(a 
key-value store) that makes your lookups faster – probably right now you are 
using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your 
use-case than from batch->streaming. Myabe consider just going with a different 
data store.

IMHO, stream should only be used if you really want to act on the new events in 
real-time. It is generally harder to get a streaming job correct than a batch 
one.



2)       If current setup is expensive due to serialization-deserialization 
then that should be fixed by moving to a faster format (maybe AVRO? - I don’t 
have a lot of expertise in that). I don’t see how that problem will go away 
with Flink – so still need to handle serialization.



3)       Even if you do decide to move to Flink – I think you can do this with 
one job, two jobs are not needed. At every incoming event, check the previous 
state and update/output to kafka or whatever data store you are using.


Thanks
Ankit

From: Flavio Pompermaier <pomperma...@okkam.it>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <k.klou...@data-artisans.com>
Cc: user <user@flink.apache.org>
Subject: Re: Stateful streaming question

Hi Kostas,
thanks for your quick response.
I also thought about using Async IO, I just need to figure out how to correctly 
handle parallelism and number of async requests.
However that's probably the way to go..is it possible also to set a number of 
retry attempts/backoff when the async request fails (maybe due to a too busy 
server)?

For the second part I think it's ok to persist the state into RocksDB or HDFS, 
my question is indeed about that: is it safe to start reading (with another 
Flink job) from RocksDB or HDFS having an updatable state "pending" on it? 
Should I ensure that state updates are not possible until the other Flink job 
hasn't finish to read the persisted data?

And another question...I've tried to draft such a processand basically I have 
the following code:

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)
        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

          private transient ValueState<MyGroupedObj> state;

          @Override
          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws 
Exception {
            MyGroupedObj current = state.value();
            if (current == null) {
              current = new MyGroupedObj();
            }
            ....
           current.addTuple(t);
            ...
            state.update(current);
            out.collect(current);
          }

          @Override
          public void open(Configuration config) {
            ValueStateDescriptor<MyGroupedObj> descriptor =
                      new ValueStateDescriptor<>( 
"test",TypeInformation.of(MyGroupedObj.class));
              state = getRuntimeContext().getState(descriptor);
          }
        });
    groupedObj.print();

but obviously this way I emit the updated object on every update while, 
actually, I just want to persist the ValueState somehow (and make it available 
to another job that runs one/moth for example). Is that possible?


On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas 
<k.klou...@data-artisans.com<mailto:k.klou...@data-artisans.com>> wrote:
Hi Flavio,

From what I understand, for the first part you are correct. You can use Flink’s 
internal state to keep your enriched data.
In fact, if you are also querying an external system to enrich your data, it is 
worth looking at the AsyncIO feature:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html

Now for the second part, currently in Flink you cannot iterate over all 
registered keys for which you have state. A pointer
to look at the may be useful is the queryable state:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html

This is still an experimental feature, but let us know your opinion if you use 
it.

Finally, an alternative would be to keep state in Flink, and periodically flush 
it to an external storage system, which you can
query at will.

Thanks,
Kostas


On May 16, 2017, at 4:38 PM, Flavio Pompermaier 
<pomperma...@okkam.it<mailto:pomperma...@okkam.it>> wrote:

Hi to all,
we're still playing with Flink streaming part in order to see whether it can 
improve our current batch pipeline.
At the moment, we have a job that translate incoming data (as Row) into Tuple4, 
groups them together by the first field and persist the result to disk (using a 
thrift object). When we need to add tuples to those grouped objects we need to 
read again the persisted data, flat it back to Tuple4, union with the new 
tuples, re-group by key and finally persist.

This is very expansive to do with batch computation while is should pretty 
straightforward to do with streaming (from what I understood): I just need to 
use ListState. Right?
Then, let's say I need to scan all the data of the stateful computation (key 
and values), in order to do some other computation, I'd like to know:

  *   how to do that? I.e. create a DataSet/DataSource<Key,Value> from the 
stateful data in the stream
  *   is there any problem to access the stateful data without stopping 
incoming data (and thus possible updates to the states)?
Thanks in advance for the support,
Flavio





--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908<tel:+39%200461%20182%203908>

Reply via email to