Hi Team, Will you be able to guide me on this? Is this a known issue with checkpointing ?
CVP On 22 Sep 2016 15:57, "Chakravarthy varaga" <chakravarth...@gmail.com> wrote: > PFA, Flink_checkpoint_time.png in relation to this issue. > > On Thu, Sep 22, 2016 at 3:38 PM, Chakravarthy varaga < > chakravarth...@gmail.com> wrote: > >> Hi Aljoscha & Fabian, >> >> Finally I got this working. Thanks for your help. In terms persisting >> the state (for S2), I tried to use checkpoint every 10 Secs using a >> FsStateBackend... What I notice is that the checkpoint duration is almost >> 2 minutes for many cases, while for the other cases it varies from 100 ms >> to 1.5 minutes frequently. >> >> The pseudocode is as below: >> >> KeyedStream<String, String> ks1 = ds1.keyBy("*") ; >> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T >> into k-v pairs).keyBy(0); >> >> ks1.connect(ks2).flatMap(X); >> //X is a CoFlatMapFunction that inserts and removes elements from >> ks2 into a key-value state member. Elements from ks1 are matched against >> that state. >> >> //ks1 is streaming about 100K events/sec from kafka topic >> //ks2 is streaming about 1 event every 10 minutes... Precisely when >> the 1st event is consumed from this stream, checkpoint takes 2 minutes >> straightaway. >> >> The version of flink is 1.1.2 >> >> Best Regards >> CVP >> >> On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> you don't need the BlockedEventState class, you should be able to just >>> do this: >>> >>> private transient ValueState<BlockedRoadInfo> blockedRoads; >>> ............ >>> @Override >>> public void open(final org.apache.flink.configuration.Configuration >>> parameters) throws Exception { >>> final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = >>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", >>> TypeInformation.of(BlockedRoadInfo.class), null); >>> blockedRoads = getRuntimeContext().getState(blockedStateDesc); >>> >>> }; >>> >>> } >>> >>> Cheers, >>> Aljoscha >>> >>> >>> On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga < >>> chakravarth...@gmail.com> wrote: >>> >>>> Hi Fabian, >>>> >>>> I'm coding to check if your proposal works and hit with an issue >>>> with ClassCastException. >>>> >>>> >>>> // Here is my Value that has state information.....an >>>> implementation of my value state... where the key is a Double value... on >>>> connected stream ks2 >>>> >>>> public class BlockedEventState implements >>>> ValueState<BlockedRoadInfo> { >>>> >>>> public BlockedRoadInfo blockedRoad; >>>> >>>> @Override >>>> public void clear() { >>>> blockedRoad = null; >>>> >>>> } >>>> >>>> @Override >>>> public BlockedRoadInfo value() throws IOException { >>>> return blockedRoad; >>>> } >>>> >>>> @Override >>>> public void update(final BlockedRoadInfo value) throws IOException { >>>> blockedRoad = value; >>>> } >>>> } >>>> >>>> //BlockedRoadInfo class... >>>> public class BlockedRoadInfo { >>>> long maxLink; >>>> long minLink; >>>> double blockedEventId; >>>> ....setters & ... getters >>>> } >>>> >>>> /// new RichCoFlatMapFunction() { >>>> >>>> private transient BlockedEventState blockedRoads; >>>> ............ >>>> @Override >>>> public void open(final org.apache.flink.configuration.Configuration >>>> parameters) throws Exception { >>>> final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = >>>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", >>>> TypeInformation.of(BlockedRoadInfo.class), null); >>>> blockedRoads = (BlockedEventState) >>>> getRuntimeContext().getState(blockedStateDesc); * // FAILS HERE WITH >>>> CLASSCAST* >>>> >>>> }; >>>> >>>> } >>>> >>>> >>>> >>>> >>>> *Caused by: java.lang.ClassCastException: >>>> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to >>>> com.ericsson.components.aia.io >>>> <http://com.ericsson.components.aia.io>t.volvo.state.BlockedEventState* >>>> >>>> >>>> >>>> *I have tried to set the state backend to both MemState and >>>> FsState...streamEnv.setStateBackend(new >>>> FsStateBackend("file:///tmp/flink/checkpoints"));* >>>> >>>> >>>> >>>> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Not sure if I got your requirements right, but would this work? >>>>> >>>>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ; >>>>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into >>>>> k-v pairs).keyBy(0); >>>>> >>>>> ks1.connect(ks2).flatMap(X) >>>>> >>>>> X is a CoFlatMapFunction that inserts and removes elements from ks2 >>>>> into a key-value state member. Elements from ks1 are matched against that >>>>> state. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga < >>>>> chakravarth...@gmail.com>: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> First of all thanks for all your prompt responses. With regards >>>>>> to 2) Multiple looks ups, I have to clarify what I mean by that... >>>>>> >>>>>> DS1<String> elementKeyStream = stream1.map(String<>); this maps >>>>>> each of the streaming elements into string mapped value... >>>>>> DS2<T> = stream2.xxx(); // where stream2 is a kafka source >>>>>> stream, as you proposed.. xxx() should be my function() which splits the >>>>>> string and generates key1:<value1>, key2:<value2>, key3:<value3> >>>>>> ....keyN:<value4> >>>>>> >>>>>> Now, >>>>>> I wish to map elementKeyStream with look ups within (key1, >>>>>> key2...keyN) where key1, key2.. keyN and their respective values should >>>>>> be >>>>>> available across the cluster... >>>>>> >>>>>> Thanks a million ! >>>>>> CVP >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> That depends. >>>>>>> 1) Growing/Shrinking: This should work. New entries can always be >>>>>>> inserted. In order to remove entries from the k-v-state you have to set >>>>>>> the >>>>>>> value to null. Note that you need an explicit delete-value record to >>>>>>> trigger the eviction. >>>>>>> 2) Multiple lookups: This does only work if all lookups are >>>>>>> independent from each other. You can partition DS1 only on a single key >>>>>>> and >>>>>>> the other keys might be located on different shards. A workaround might >>>>>>> be >>>>>>> to duplicate S1 events for each key that they need to look up. However, >>>>>>> you >>>>>>> might need to collect events from the same S1 event after the join. If >>>>>>> that >>>>>>> does not work for you, the only thing that comes to my mind is to >>>>>>> broadcast >>>>>>> the state and keep a full local copy in each operator. >>>>>>> >>>>>>> Let me add one more thing regarding the upcoming rescaling feature. >>>>>>> If this is interesting for you, rescaling will also work (maybe not in >>>>>>> the >>>>>>> first version) for broadcasted state, i.e. state which is the same on >>>>>>> all >>>>>>> parallel operator instances. >>>>>>> >>>>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga < >>>>>>> chakravarth...@gmail.com>: >>>>>>> >>>>>>>> I'm understanding this better with your explanation.. >>>>>>>> With this use case, each element in DS1 has to look up against a >>>>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the >>>>>>>> no., >>>>>>>> of keys.... will the key-value shard work in this case? >>>>>>>> >>>>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Operator state is always local in Flink. However, with key-value >>>>>>>>> state, you can have something which behaves kind of similar to a >>>>>>>>> distribute >>>>>>>>> hashmap, because each operator holds a different shard/partition of >>>>>>>>> the >>>>>>>>> hashtable. >>>>>>>>> >>>>>>>>> If you have to do only a single key lookup for each element of >>>>>>>>> DS1, you should think about partitioning both streams (keyBy) and >>>>>>>>> writing >>>>>>>>> the state into Flink's key-value state [1]. >>>>>>>>> >>>>>>>>> This will have several benefits: >>>>>>>>> 1) State does not need to be replicated >>>>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can >>>>>>>>> reside on disk. You are not bound to the memory of the JVM. >>>>>>>>> 3) Flink takes care of the look-up. No need to have your own >>>>>>>>> hashmap. >>>>>>>>> 4) It will only be possible to rescale jobs with key-value state >>>>>>>>> (this feature is currently under development). >>>>>>>>> >>>>>>>>> If using the key-value state is possible, I'd go for that. >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>>>>>> apis/streaming/state.html >>>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>>>>>> apis/streaming/state_backends.html >>>>>>>>> >>>>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga < >>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>> >>>>>>>>>> certainly, what I thought as well... >>>>>>>>>> The output of DataStream2 could be in 1000s and there are state >>>>>>>>>> updates... >>>>>>>>>> reading this topic from the other job, job1, is okie. >>>>>>>>>> However, assuming that we maintain this state into a collection, >>>>>>>>>> and updating the state (by reading from the topic) in this >>>>>>>>>> collection, will >>>>>>>>>> this be replicated across the cluster within this job1 ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the >>>>>>>>>>> other job an option? >>>>>>>>>>> >>>>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga < >>>>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> Hi Fabian, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your response. Apparently these DataStream >>>>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink >>>>>>>>>>>> applications >>>>>>>>>>>> running within the same cluster. >>>>>>>>>>>> DataStream2 (from Job2) applies transformations and updates >>>>>>>>>>>> a 'cache' on which (Job1) needs to work on. >>>>>>>>>>>> Our intention is to not use the external key/value store as >>>>>>>>>>>> we are trying to localize the cache within the cluster. >>>>>>>>>>>> Is there a way? >>>>>>>>>>>> >>>>>>>>>>>> Best Regards >>>>>>>>>>>> CVP >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske < >>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> Flink does not provide shared state. >>>>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such >>>>>>>>>>>>> that each operator has its own local copy of the state. >>>>>>>>>>>>> >>>>>>>>>>>>> If that does not work for you because the state is too large >>>>>>>>>>>>> and if it is possible to partition the state (and both streams), >>>>>>>>>>>>> you can >>>>>>>>>>>>> also use keyBy instead of broadcast. >>>>>>>>>>>>> >>>>>>>>>>>>> Finally, you can use an external system like a KeyValue Store >>>>>>>>>>>>> or In-Memory store like Apache Ignite to hold your distributed >>>>>>>>>>>>> collection. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>> >>>>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga < >>>>>>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Team, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Can someone help me here? Appreciate any response ! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> Varaga >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga < >>>>>>>>>>>>>> chakravarth...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Team, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm working on a Flink Streaming application. The data >>>>>>>>>>>>>>> is injected through Kafka connectors. The payload volume is >>>>>>>>>>>>>>> roughly >>>>>>>>>>>>>>> 100K/sec. The event payload is a string. Let's call this as >>>>>>>>>>>>>>> DataStream1. >>>>>>>>>>>>>>> This application also uses another DataStream, call it >>>>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements >>>>>>>>>>>>>>> of this >>>>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally >>>>>>>>>>>>>>> updates a >>>>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink >>>>>>>>>>>>>>> application should >>>>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 >>>>>>>>>>>>>>> application >>>>>>>>>>>>>>> could check the state of the values in this collection. Is >>>>>>>>>>>>>>> there a way to >>>>>>>>>>>>>>> do this in Flink? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I don't see any Shared Collection used within the >>>>>>>>>>>>>>> cluster? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>> CVP >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >