Hi Aljoscha & Fabian, Finally I got this working. Thanks for your help. In terms persisting the state (for S2), I tried to use checkpoint every 10 Secs using a FsStateBackend... What I notice is that the checkpoint duration is almost 2 minutes for many cases, while for the other cases it varies from 100 ms to 1.5 minutes frequently.
The pseudocode is as below: KeyedStream<String, String> ks1 = ds1.keyBy("*") ; KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v pairs).keyBy(0); ks1.connect(ks2).flatMap(X); //X is a CoFlatMapFunction that inserts and removes elements from ks2 into a key-value state member. Elements from ks1 are matched against that state. //ks1 is streaming about 100K events/sec from kafka topic //ks2 is streaming about 1 event every 10 minutes... Precisely when the 1st event is consumed from this stream, checkpoint takes 2 minutes straightaway. The version of flink is 1.1.2 Best Regards CVP On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > you don't need the BlockedEventState class, you should be able to just do > this: > > private transient ValueState<BlockedRoadInfo> blockedRoads; > ............ > @Override > public void open(final org.apache.flink.configuration.Configuration > parameters) throws Exception { > final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = > new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", > TypeInformation.of(BlockedRoadInfo.class), null); > blockedRoads = getRuntimeContext().getState(blockedStateDesc); > > }; > > } > > Cheers, > Aljoscha > > > On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <chakravarth...@gmail.com> > wrote: > >> Hi Fabian, >> >> I'm coding to check if your proposal works and hit with an issue with >> ClassCastException. >> >> >> // Here is my Value that has state information.....an implementation >> of my value state... where the key is a Double value... on connected stream >> ks2 >> >> public class BlockedEventState implements ValueState<BlockedRoadInfo> >> { >> >> public BlockedRoadInfo blockedRoad; >> >> @Override >> public void clear() { >> blockedRoad = null; >> >> } >> >> @Override >> public BlockedRoadInfo value() throws IOException { >> return blockedRoad; >> } >> >> @Override >> public void update(final BlockedRoadInfo value) throws IOException { >> blockedRoad = value; >> } >> } >> >> //BlockedRoadInfo class... >> public class BlockedRoadInfo { >> long maxLink; >> long minLink; >> double blockedEventId; >> ....setters & ... getters >> } >> >> /// new RichCoFlatMapFunction() { >> >> private transient BlockedEventState blockedRoads; >> ............ >> @Override >> public void open(final org.apache.flink.configuration.Configuration >> parameters) throws Exception { >> final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = >> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", >> TypeInformation.of(BlockedRoadInfo.class), null); >> blockedRoads = (BlockedEventState) getRuntimeContext().getState( >> blockedStateDesc); * // FAILS HERE WITH CLASSCAST* >> >> }; >> >> } >> >> >> >> >> *Caused by: java.lang.ClassCastException: >> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to >> com.ericsson.components.aia.iot.volvo.state.BlockedEventState* >> >> >> >> *I have tried to set the state backend to both MemState and >> FsState...streamEnv.setStateBackend(new >> FsStateBackend("file:///tmp/flink/checkpoints"));* >> >> >> >> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Not sure if I got your requirements right, but would this work? >>> >>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ; >>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into >>> k-v pairs).keyBy(0); >>> >>> ks1.connect(ks2).flatMap(X) >>> >>> X is a CoFlatMapFunction that inserts and removes elements from ks2 into >>> a key-value state member. Elements from ks1 are matched against that state. >>> >>> Best, Fabian >>> >>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com >>> >: >>> >>>> Hi Fabian, >>>> >>>> First of all thanks for all your prompt responses. With regards to >>>> 2) Multiple looks ups, I have to clarify what I mean by that... >>>> >>>> DS1<String> elementKeyStream = stream1.map(String<>); this maps >>>> each of the streaming elements into string mapped value... >>>> DS2<T> = stream2.xxx(); // where stream2 is a kafka source >>>> stream, as you proposed.. xxx() should be my function() which splits the >>>> string and generates key1:<value1>, key2:<value2>, key3:<value3> >>>> ....keyN:<value4> >>>> >>>> Now, >>>> I wish to map elementKeyStream with look ups within (key1, >>>> key2...keyN) where key1, key2.. keyN and their respective values should be >>>> available across the cluster... >>>> >>>> Thanks a million ! >>>> CVP >>>> >>>> >>>> >>>> >>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> That depends. >>>>> 1) Growing/Shrinking: This should work. New entries can always be >>>>> inserted. In order to remove entries from the k-v-state you have to set >>>>> the >>>>> value to null. Note that you need an explicit delete-value record to >>>>> trigger the eviction. >>>>> 2) Multiple lookups: This does only work if all lookups are >>>>> independent from each other. You can partition DS1 only on a single key >>>>> and >>>>> the other keys might be located on different shards. A workaround might be >>>>> to duplicate S1 events for each key that they need to look up. However, >>>>> you >>>>> might need to collect events from the same S1 event after the join. If >>>>> that >>>>> does not work for you, the only thing that comes to my mind is to >>>>> broadcast >>>>> the state and keep a full local copy in each operator. >>>>> >>>>> Let me add one more thing regarding the upcoming rescaling feature. If >>>>> this is interesting for you, rescaling will also work (maybe not in the >>>>> first version) for broadcasted state, i.e. state which is the same on all >>>>> parallel operator instances. >>>>> >>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga < >>>>> chakravarth...@gmail.com>: >>>>> >>>>>> I'm understanding this better with your explanation.. >>>>>> With this use case, each element in DS1 has to look up against a >>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no., >>>>>> of keys.... will the key-value shard work in this case? >>>>>> >>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Operator state is always local in Flink. However, with key-value >>>>>>> state, you can have something which behaves kind of similar to a >>>>>>> distribute >>>>>>> hashmap, because each operator holds a different shard/partition of the >>>>>>> hashtable. >>>>>>> >>>>>>> If you have to do only a single key lookup for each element of DS1, >>>>>>> you should think about partitioning both streams (keyBy) and writing the >>>>>>> state into Flink's key-value state [1]. >>>>>>> >>>>>>> This will have several benefits: >>>>>>> 1) State does not need to be replicated >>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can >>>>>>> reside on disk. You are not bound to the memory of the JVM. >>>>>>> 3) Flink takes care of the look-up. No need to have your own hashmap. >>>>>>> 4) It will only be possible to rescale jobs with key-value state >>>>>>> (this feature is currently under development). >>>>>>> >>>>>>> If using the key-value state is possible, I'd go for that. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs- >>>>>>> release-1.1/apis/streaming/state.html >>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs- >>>>>>> release-1.1/apis/streaming/state_backends.html >>>>>>> >>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga < >>>>>>> chakravarth...@gmail.com>: >>>>>>> >>>>>>>> certainly, what I thought as well... >>>>>>>> The output of DataStream2 could be in 1000s and there are state >>>>>>>> updates... >>>>>>>> reading this topic from the other job, job1, is okie. >>>>>>>> However, assuming that we maintain this state into a collection, >>>>>>>> and updating the state (by reading from the topic) in this collection, >>>>>>>> will >>>>>>>> this be replicated across the cluster within this job1 ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the >>>>>>>>> other job an option? >>>>>>>>> >>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga < >>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>> >>>>>>>>>> Hi Fabian, >>>>>>>>>> >>>>>>>>>> Thanks for your response. Apparently these DataStream >>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink >>>>>>>>>> applications >>>>>>>>>> running within the same cluster. >>>>>>>>>> DataStream2 (from Job2) applies transformations and updates a >>>>>>>>>> 'cache' on which (Job1) needs to work on. >>>>>>>>>> Our intention is to not use the external key/value store as >>>>>>>>>> we are trying to localize the cache within the cluster. >>>>>>>>>> Is there a way? >>>>>>>>>> >>>>>>>>>> Best Regards >>>>>>>>>> CVP >>>>>>>>>> >>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Flink does not provide shared state. >>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such >>>>>>>>>>> that each operator has its own local copy of the state. >>>>>>>>>>> >>>>>>>>>>> If that does not work for you because the state is too large and >>>>>>>>>>> if it is possible to partition the state (and both streams), you >>>>>>>>>>> can also >>>>>>>>>>> use keyBy instead of broadcast. >>>>>>>>>>> >>>>>>>>>>> Finally, you can use an external system like a KeyValue Store or >>>>>>>>>>> In-Memory store like Apache Ignite to hold your distributed >>>>>>>>>>> collection. >>>>>>>>>>> >>>>>>>>>>> Best, Fabian >>>>>>>>>>> >>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga < >>>>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> Hi Team, >>>>>>>>>>>> >>>>>>>>>>>> Can someone help me here? Appreciate any response ! >>>>>>>>>>>> >>>>>>>>>>>> Best Regards >>>>>>>>>>>> Varaga >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga < >>>>>>>>>>>> chakravarth...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Team, >>>>>>>>>>>>> >>>>>>>>>>>>> I'm working on a Flink Streaming application. The data is >>>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly >>>>>>>>>>>>> 100K/sec. >>>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1. >>>>>>>>>>>>> This application also uses another DataStream, call it >>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of >>>>>>>>>>>>> this >>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally >>>>>>>>>>>>> updates a >>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application >>>>>>>>>>>>> should >>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 >>>>>>>>>>>>> application >>>>>>>>>>>>> could check the state of the values in this collection. Is there >>>>>>>>>>>>> a way to >>>>>>>>>>>>> do this in Flink? >>>>>>>>>>>>> >>>>>>>>>>>>> I don't see any Shared Collection used within the cluster? >>>>>>>>>>>>> >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> CVP >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>