Hi Fabian, I'm coding to check if your proposal works and hit with an issue with ClassCastException.
// Here is my Value that has state information.....an implementation of my value state... where the key is a Double value... on connected stream ks2 public class BlockedEventState implements ValueState<BlockedRoadInfo> { public BlockedRoadInfo blockedRoad; @Override public void clear() { blockedRoad = null; } @Override public BlockedRoadInfo value() throws IOException { return blockedRoad; } @Override public void update(final BlockedRoadInfo value) throws IOException { blockedRoad = value; } } //BlockedRoadInfo class... public class BlockedRoadInfo { long maxLink; long minLink; double blockedEventId; ....setters & ... getters } /// new RichCoFlatMapFunction() { private transient BlockedEventState blockedRoads; ............ @Override public void open(final org.apache.flink.configuration.Configuration parameters) throws Exception { final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", TypeInformation.of(BlockedRoadInfo.class), null); blockedRoads = (BlockedEventState) getRuntimeContext().getState(blockedStateDesc); * // FAILS HERE WITH CLASSCAST* }; } *Caused by: java.lang.ClassCastException: org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to com.ericsson.components.aia.iot.volvo.state.BlockedEventState* *I have tried to set the state backend to both MemState and FsState...streamEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));* On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Not sure if I got your requirements right, but would this work? > > KeyedStream<String, String> ks1 = ds1.keyBy("*") ; > KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v > pairs).keyBy(0); > > ks1.connect(ks2).flatMap(X) > > X is a CoFlatMapFunction that inserts and removes elements from ks2 into a > key-value state member. Elements from ks1 are matched against that state. > > Best, Fabian > > 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com>: > >> Hi Fabian, >> >> First of all thanks for all your prompt responses. With regards to >> 2) Multiple looks ups, I have to clarify what I mean by that... >> >> DS1<String> elementKeyStream = stream1.map(String<>); this maps >> each of the streaming elements into string mapped value... >> DS2<T> = stream2.xxx(); // where stream2 is a kafka source stream, >> as you proposed.. xxx() should be my function() which splits the string and >> generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4> >> >> Now, >> I wish to map elementKeyStream with look ups within (key1, >> key2...keyN) where key1, key2.. keyN and their respective values should be >> available across the cluster... >> >> Thanks a million ! >> CVP >> >> >> >> >> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> That depends. >>> 1) Growing/Shrinking: This should work. New entries can always be >>> inserted. In order to remove entries from the k-v-state you have to set the >>> value to null. Note that you need an explicit delete-value record to >>> trigger the eviction. >>> 2) Multiple lookups: This does only work if all lookups are independent >>> from each other. You can partition DS1 only on a single key and the other >>> keys might be located on different shards. A workaround might be to >>> duplicate S1 events for each key that they need to look up. However, you >>> might need to collect events from the same S1 event after the join. If that >>> does not work for you, the only thing that comes to my mind is to broadcast >>> the state and keep a full local copy in each operator. >>> >>> Let me add one more thing regarding the upcoming rescaling feature. If >>> this is interesting for you, rescaling will also work (maybe not in the >>> first version) for broadcasted state, i.e. state which is the same on all >>> parallel operator instances. >>> >>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com >>> >: >>> >>>> I'm understanding this better with your explanation.. >>>> With this use case, each element in DS1 has to look up against a >>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no., >>>> of keys.... will the key-value shard work in this case? >>>> >>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Operator state is always local in Flink. However, with key-value >>>>> state, you can have something which behaves kind of similar to a >>>>> distribute >>>>> hashmap, because each operator holds a different shard/partition of the >>>>> hashtable. >>>>> >>>>> If you have to do only a single key lookup for each element of DS1, >>>>> you should think about partitioning both streams (keyBy) and writing the >>>>> state into Flink's key-value state [1]. >>>>> >>>>> This will have several benefits: >>>>> 1) State does not need to be replicated >>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can >>>>> reside on disk. You are not bound to the memory of the JVM. >>>>> 3) Flink takes care of the look-up. No need to have your own hashmap. >>>>> 4) It will only be possible to rescale jobs with key-value state (this >>>>> feature is currently under development). >>>>> >>>>> If using the key-value state is possible, I'd go for that. >>>>> >>>>> Best, Fabian >>>>> >>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>> apis/streaming/state.html >>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>> apis/streaming/state_backends.html >>>>> >>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga < >>>>> chakravarth...@gmail.com>: >>>>> >>>>>> certainly, what I thought as well... >>>>>> The output of DataStream2 could be in 1000s and there are state >>>>>> updates... >>>>>> reading this topic from the other job, job1, is okie. >>>>>> However, assuming that we maintain this state into a collection, and >>>>>> updating the state (by reading from the topic) in this collection, will >>>>>> this be replicated across the cluster within this job1 ? >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the >>>>>>> other job an option? >>>>>>> >>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga < >>>>>>> chakravarth...@gmail.com>: >>>>>>> >>>>>>>> Hi Fabian, >>>>>>>> >>>>>>>> Thanks for your response. Apparently these DataStream >>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink >>>>>>>> applications >>>>>>>> running within the same cluster. >>>>>>>> DataStream2 (from Job2) applies transformations and updates a >>>>>>>> 'cache' on which (Job1) needs to work on. >>>>>>>> Our intention is to not use the external key/value store as we >>>>>>>> are trying to localize the cache within the cluster. >>>>>>>> Is there a way? >>>>>>>> >>>>>>>> Best Regards >>>>>>>> CVP >>>>>>>> >>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Flink does not provide shared state. >>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such >>>>>>>>> that each operator has its own local copy of the state. >>>>>>>>> >>>>>>>>> If that does not work for you because the state is too large and >>>>>>>>> if it is possible to partition the state (and both streams), you can >>>>>>>>> also >>>>>>>>> use keyBy instead of broadcast. >>>>>>>>> >>>>>>>>> Finally, you can use an external system like a KeyValue Store or >>>>>>>>> In-Memory store like Apache Ignite to hold your distributed >>>>>>>>> collection. >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga < >>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>> >>>>>>>>>> Hi Team, >>>>>>>>>> >>>>>>>>>> Can someone help me here? Appreciate any response ! >>>>>>>>>> >>>>>>>>>> Best Regards >>>>>>>>>> Varaga >>>>>>>>>> >>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga < >>>>>>>>>> chakravarth...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Team, >>>>>>>>>>> >>>>>>>>>>> I'm working on a Flink Streaming application. The data is >>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly >>>>>>>>>>> 100K/sec. >>>>>>>>>>> The event payload is a string. Let's call this as DataStream1. >>>>>>>>>>> This application also uses another DataStream, call it >>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of >>>>>>>>>>> this >>>>>>>>>>> DataStream2 involves in a certain transformation that finally >>>>>>>>>>> updates a >>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application >>>>>>>>>>> should >>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 >>>>>>>>>>> application >>>>>>>>>>> could check the state of the values in this collection. Is there a >>>>>>>>>>> way to >>>>>>>>>>> do this in Flink? >>>>>>>>>>> >>>>>>>>>>> I don't see any Shared Collection used within the cluster? >>>>>>>>>>> >>>>>>>>>>> Best Regards >>>>>>>>>>> CVP >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >