Hi, you don't need the BlockedEventState class, you should be able to just do this:
private transient ValueState<BlockedRoadInfo> blockedRoads; ............ @Override public void open(final org.apache.flink.configuration.Configuration parameters) throws Exception { final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", TypeInformation.of(BlockedRoadInfo.class), null); blockedRoads = getRuntimeContext().getState(blockedStateDesc); }; } Cheers, Aljoscha On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <chakravarth...@gmail.com> wrote: > Hi Fabian, > > I'm coding to check if your proposal works and hit with an issue with > ClassCastException. > > > // Here is my Value that has state information.....an implementation > of my value state... where the key is a Double value... on connected stream > ks2 > > public class BlockedEventState implements ValueState<BlockedRoadInfo> { > > public BlockedRoadInfo blockedRoad; > > @Override > public void clear() { > blockedRoad = null; > > } > > @Override > public BlockedRoadInfo value() throws IOException { > return blockedRoad; > } > > @Override > public void update(final BlockedRoadInfo value) throws IOException { > blockedRoad = value; > } > } > > //BlockedRoadInfo class... > public class BlockedRoadInfo { > long maxLink; > long minLink; > double blockedEventId; > ....setters & ... getters > } > > /// new RichCoFlatMapFunction() { > > private transient BlockedEventState blockedRoads; > ............ > @Override > public void open(final org.apache.flink.configuration.Configuration > parameters) throws Exception { > final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new > ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", > TypeInformation.of(BlockedRoadInfo.class), null); > blockedRoads = (BlockedEventState) > getRuntimeContext().getState(blockedStateDesc); * // FAILS HERE WITH > CLASSCAST* > > }; > > } > > > > > *Caused by: java.lang.ClassCastException: > org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to > com.ericsson.components.aia.iot.volvo.state.BlockedEventState* > > > > *I have tried to set the state backend to both MemState and > FsState...streamEnv.setStateBackend(new > FsStateBackend("file:///tmp/flink/checkpoints"));* > > > > On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Not sure if I got your requirements right, but would this work? >> >> KeyedStream<String, String> ks1 = ds1.keyBy("*") ; >> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v >> pairs).keyBy(0); >> >> ks1.connect(ks2).flatMap(X) >> >> X is a CoFlatMapFunction that inserts and removes elements from ks2 into >> a key-value state member. Elements from ks1 are matched against that state. >> >> Best, Fabian >> >> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com> >> : >> >>> Hi Fabian, >>> >>> First of all thanks for all your prompt responses. With regards to >>> 2) Multiple looks ups, I have to clarify what I mean by that... >>> >>> DS1<String> elementKeyStream = stream1.map(String<>); this maps >>> each of the streaming elements into string mapped value... >>> DS2<T> = stream2.xxx(); // where stream2 is a kafka source stream, >>> as you proposed.. xxx() should be my function() which splits the string and >>> generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4> >>> >>> Now, >>> I wish to map elementKeyStream with look ups within (key1, >>> key2...keyN) where key1, key2.. keyN and their respective values should be >>> available across the cluster... >>> >>> Thanks a million ! >>> CVP >>> >>> >>> >>> >>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> That depends. >>>> 1) Growing/Shrinking: This should work. New entries can always be >>>> inserted. In order to remove entries from the k-v-state you have to set the >>>> value to null. Note that you need an explicit delete-value record to >>>> trigger the eviction. >>>> 2) Multiple lookups: This does only work if all lookups are independent >>>> from each other. You can partition DS1 only on a single key and the other >>>> keys might be located on different shards. A workaround might be to >>>> duplicate S1 events for each key that they need to look up. However, you >>>> might need to collect events from the same S1 event after the join. If that >>>> does not work for you, the only thing that comes to my mind is to broadcast >>>> the state and keep a full local copy in each operator. >>>> >>>> Let me add one more thing regarding the upcoming rescaling feature. If >>>> this is interesting for you, rescaling will also work (maybe not in the >>>> first version) for broadcasted state, i.e. state which is the same on all >>>> parallel operator instances. >>>> >>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga < >>>> chakravarth...@gmail.com>: >>>> >>>>> I'm understanding this better with your explanation.. >>>>> With this use case, each element in DS1 has to look up against a >>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no., >>>>> of keys.... will the key-value shard work in this case? >>>>> >>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Operator state is always local in Flink. However, with key-value >>>>>> state, you can have something which behaves kind of similar to a >>>>>> distribute >>>>>> hashmap, because each operator holds a different shard/partition of the >>>>>> hashtable. >>>>>> >>>>>> If you have to do only a single key lookup for each element of DS1, >>>>>> you should think about partitioning both streams (keyBy) and writing the >>>>>> state into Flink's key-value state [1]. >>>>>> >>>>>> This will have several benefits: >>>>>> 1) State does not need to be replicated >>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can >>>>>> reside on disk. You are not bound to the memory of the JVM. >>>>>> 3) Flink takes care of the look-up. No need to have your own hashmap. >>>>>> 4) It will only be possible to rescale jobs with key-value state >>>>>> (this feature is currently under development). >>>>>> >>>>>> If using the key-value state is possible, I'd go for that. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state_backends.html >>>>>> >>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga < >>>>>> chakravarth...@gmail.com>: >>>>>> >>>>>>> certainly, what I thought as well... >>>>>>> The output of DataStream2 could be in 1000s and there are state >>>>>>> updates... >>>>>>> reading this topic from the other job, job1, is okie. >>>>>>> However, assuming that we maintain this state into a collection, and >>>>>>> updating the state (by reading from the topic) in this collection, will >>>>>>> this be replicated across the cluster within this job1 ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the >>>>>>>> other job an option? >>>>>>>> >>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga < >>>>>>>> chakravarth...@gmail.com>: >>>>>>>> >>>>>>>>> Hi Fabian, >>>>>>>>> >>>>>>>>> Thanks for your response. Apparently these DataStream >>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink >>>>>>>>> applications >>>>>>>>> running within the same cluster. >>>>>>>>> DataStream2 (from Job2) applies transformations and updates a >>>>>>>>> 'cache' on which (Job1) needs to work on. >>>>>>>>> Our intention is to not use the external key/value store as we >>>>>>>>> are trying to localize the cache within the cluster. >>>>>>>>> Is there a way? >>>>>>>>> >>>>>>>>> Best Regards >>>>>>>>> CVP >>>>>>>>> >>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Flink does not provide shared state. >>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such >>>>>>>>>> that each operator has its own local copy of the state. >>>>>>>>>> >>>>>>>>>> If that does not work for you because the state is too large and >>>>>>>>>> if it is possible to partition the state (and both streams), you can >>>>>>>>>> also >>>>>>>>>> use keyBy instead of broadcast. >>>>>>>>>> >>>>>>>>>> Finally, you can use an external system like a KeyValue Store or >>>>>>>>>> In-Memory store like Apache Ignite to hold your distributed >>>>>>>>>> collection. >>>>>>>>>> >>>>>>>>>> Best, Fabian >>>>>>>>>> >>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga < >>>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>>> >>>>>>>>>>> Hi Team, >>>>>>>>>>> >>>>>>>>>>> Can someone help me here? Appreciate any response ! >>>>>>>>>>> >>>>>>>>>>> Best Regards >>>>>>>>>>> Varaga >>>>>>>>>>> >>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga < >>>>>>>>>>> chakravarth...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Team, >>>>>>>>>>>> >>>>>>>>>>>> I'm working on a Flink Streaming application. The data is >>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly >>>>>>>>>>>> 100K/sec. >>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1. >>>>>>>>>>>> This application also uses another DataStream, call it >>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of >>>>>>>>>>>> this >>>>>>>>>>>> DataStream2 involves in a certain transformation that finally >>>>>>>>>>>> updates a >>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application >>>>>>>>>>>> should >>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 >>>>>>>>>>>> application >>>>>>>>>>>> could check the state of the values in this collection. Is there a >>>>>>>>>>>> way to >>>>>>>>>>>> do this in Flink? >>>>>>>>>>>> >>>>>>>>>>>> I don't see any Shared Collection used within the cluster? >>>>>>>>>>>> >>>>>>>>>>>> Best Regards >>>>>>>>>>>> CVP >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >