Hi Aljoscha & Fabian,

    Finally I got this working. Thanks for your help. In terms persisting
the state (for S2), I tried to use checkpoint every 10 Secs using a
FsStateBackend... What I notice is that the checkpoint duration is  almost
2 minutes for many cases, while for the other cases it varies from 100 ms
to 1.5 minutes frequently.

    The pseudocode is as below:

     KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
     KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
k-v pairs).keyBy(0);

     ks1.connect(ks2).flatMap(X);
     //X is a CoFlatMapFunction that inserts and removes elements from ks2
into a key-value state member. Elements from ks1 are matched against that
state.

     //ks1 is streaming about 100K events/sec from kafka topic
     //ks2 is streaming about 1 event every 10 minutes... Precisely when
the 1st event is consumed from this stream, checkpoint takes 2 minutes
straightaway.

    The version of flink is 1.1.2

 Best Regards
CVP

On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> you don't need the BlockedEventState class, you should be able to just do
> this:
>
> private transient ValueState<BlockedRoadInfo> blockedRoads;
>          ............
>       @Override
>     public void open(final org.apache.flink.configuration.Configuration
> parameters) throws Exception {
>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>                 TypeInformation.of(BlockedRoadInfo.class), null);
>         blockedRoads = getRuntimeContext().getState(blockedStateDesc);
>
>     };
>
>   }
>
> Cheers,
> Aljoscha
>
>
> On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <chakravarth...@gmail.com>
> wrote:
>
>> Hi Fabian,
>>
>>     I'm coding to check if your proposal works and hit with an issue with
>> ClassCastException.
>>
>>
>>     // Here is my Value that has state information.....an implementation
>> of my value state... where the key is a Double value... on connected stream
>> ks2
>>
>>     public class BlockedEventState implements ValueState<BlockedRoadInfo>
>> {
>>
>>     public BlockedRoadInfo blockedRoad;
>>
>>     @Override
>>     public void clear() {
>>         blockedRoad = null;
>>
>>     }
>>
>>     @Override
>>     public BlockedRoadInfo value() throws IOException {
>>         return blockedRoad;
>>     }
>>
>>     @Override
>>     public void update(final BlockedRoadInfo value) throws IOException {
>>         blockedRoad = value;
>>     }
>> }
>>
>>        //BlockedRoadInfo class...
>>         public class BlockedRoadInfo {
>>             long maxLink;
>>             long minLink;
>>             double blockedEventId;
>>     ....setters & ... getters
>> }
>>
>> /// new RichCoFlatMapFunction() {
>>
>> private transient BlockedEventState blockedRoads;
>>          ............
>>       @Override
>>     public void open(final org.apache.flink.configuration.Configuration
>> parameters) throws Exception {
>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>         blockedRoads = (BlockedEventState) getRuntimeContext().getState(
>> blockedStateDesc); * // FAILS HERE WITH CLASSCAST*
>>
>>     };
>>
>>   }
>>
>>
>>
>>
>> *Caused by: java.lang.ClassCastException:
>> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
>> com.ericsson.components.aia.iot.volvo.state.BlockedEventState*
>>
>>
>>
>> *I have tried to set the state backend to both MemState and
>> FsState...streamEnv.setStateBackend(new
>> FsStateBackend("file:///tmp/flink/checkpoints"));*
>>
>>
>>
>> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Not sure if I got your requirements right, but would this work?
>>>
>>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
>>> k-v pairs).keyBy(0);
>>>
>>> ks1.connect(ks2).flatMap(X)
>>>
>>> X is a CoFlatMapFunction that inserts and removes elements from ks2 into
>>> a key-value state member. Elements from ks1 are matched against that state.
>>>
>>> Best, Fabian
>>>
>>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com
>>> >:
>>>
>>>> Hi Fabian,
>>>>
>>>>      First of all thanks for all your prompt responses. With regards to
>>>> 2) Multiple looks ups, I have to clarify what I mean by that...
>>>>
>>>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>>>> each of the streaming elements into string mapped value...
>>>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source
>>>> stream, as you proposed.. xxx() should be my function() which splits the
>>>> string and generates key1:<value1>, key2:<value2>, key3:<value3>
>>>> ....keyN:<value4>
>>>>
>>>>      Now,
>>>>         I wish to map elementKeyStream with look ups within (key1,
>>>> key2...keyN) where key1, key2.. keyN and their respective values should be
>>>> available across the cluster...
>>>>
>>>> Thanks a million !
>>>> CVP
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> That depends.
>>>>> 1) Growing/Shrinking: This should work. New entries can always be
>>>>> inserted. In order to remove entries from the k-v-state you have to set 
>>>>> the
>>>>> value to null. Note that you need an explicit delete-value record to
>>>>> trigger the eviction.
>>>>> 2) Multiple lookups: This does only work if all lookups are
>>>>> independent from each other. You can partition DS1 only on a single key 
>>>>> and
>>>>> the other keys might be located on different shards. A workaround might be
>>>>> to duplicate S1 events for each key that they need to look up. However, 
>>>>> you
>>>>> might need to collect events from the same S1 event after the join. If 
>>>>> that
>>>>> does not work for you, the only thing that comes to my mind is to 
>>>>> broadcast
>>>>> the state and keep a full local copy in each operator.
>>>>>
>>>>> Let me add one more thing regarding the upcoming rescaling feature. If
>>>>> this is interesting for you, rescaling will also work (maybe not in the
>>>>> first version) for broadcasted state, i.e. state which is the same on all
>>>>> parallel operator instances.
>>>>>
>>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarth...@gmail.com>:
>>>>>
>>>>>> I'm understanding this better with your explanation..
>>>>>> With this use case,    each element in DS1 has to look up against a
>>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>>>>> of keys.... will the key-value shard work in this case?
>>>>>>
>>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Operator state is always local in Flink. However, with key-value
>>>>>>> state, you can have something which behaves kind of similar to a 
>>>>>>> distribute
>>>>>>> hashmap, because each operator holds a different shard/partition of the
>>>>>>> hashtable.
>>>>>>>
>>>>>>> If you have to do only a single key lookup for each element of DS1,
>>>>>>> you should think about partitioning both streams (keyBy) and writing the
>>>>>>> state into Flink's key-value state [1].
>>>>>>>
>>>>>>> This will have several benefits:
>>>>>>> 1) State does not need to be replicated
>>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>>>>>> 4) It will only be possible to rescale jobs with key-value state
>>>>>>> (this feature is currently under development).
>>>>>>>
>>>>>>> If using the key-value state is possible, I'd go for that.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>>>>>> release-1.1/apis/streaming/state.html
>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-
>>>>>>> release-1.1/apis/streaming/state_backends.html
>>>>>>>
>>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>>>> chakravarth...@gmail.com>:
>>>>>>>
>>>>>>>> certainly, what I thought as well...
>>>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>>>> updates...
>>>>>>>> reading this topic from the other job, job1, is okie.
>>>>>>>> However, assuming that we maintain this state into a collection,
>>>>>>>> and updating the state (by reading from the topic) in this collection, 
>>>>>>>> will
>>>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>>>> other job an option?
>>>>>>>>>
>>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hi Fabian,
>>>>>>>>>>
>>>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink 
>>>>>>>>>> applications
>>>>>>>>>> running within the same cluster.
>>>>>>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>>>>>>> 'cache' on which (Job1) needs to work on.
>>>>>>>>>>     Our intention is to not use the external key/value store as
>>>>>>>>>> we are trying to localize the cache within the cluster.
>>>>>>>>>>     Is there a way?
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>> CVP
>>>>>>>>>>
>>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Flink does not provide shared state.
>>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>>>
>>>>>>>>>>> If that does not work for you because the state is too large and
>>>>>>>>>>> if it is possible to partition the state (and both streams), you 
>>>>>>>>>>> can also
>>>>>>>>>>> use keyBy instead of broadcast.
>>>>>>>>>>>
>>>>>>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>>>>>>> In-Memory store like Apache Ignite to hold your distributed 
>>>>>>>>>>> collection.
>>>>>>>>>>>
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>
>>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>
>>>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>>>
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>> Varaga
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>
>>>>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 
>>>>>>>>>>>>> 100K/sec.
>>>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of 
>>>>>>>>>>>>> this
>>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally 
>>>>>>>>>>>>> updates a
>>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application 
>>>>>>>>>>>>> should
>>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 
>>>>>>>>>>>>> application
>>>>>>>>>>>>> could check the state of the values in this collection. Is there 
>>>>>>>>>>>>> a way to
>>>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>>>
>>>>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to