Hi Team,

      Will you be able to guide me on this? Is this a known issue with
checkpointing ?

CVP

On 22 Sep 2016 15:57, "Chakravarthy varaga" <chakravarth...@gmail.com>
wrote:

> PFA, Flink_checkpoint_time.png in relation to this issue.
>
> On Thu, Sep 22, 2016 at 3:38 PM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>> Hi Aljoscha & Fabian,
>>
>>     Finally I got this working. Thanks for your help. In terms persisting
>> the state (for S2), I tried to use checkpoint every 10 Secs using a
>> FsStateBackend... What I notice is that the checkpoint duration is  almost
>> 2 minutes for many cases, while for the other cases it varies from 100 ms
>> to 1.5 minutes frequently.
>>
>>     The pseudocode is as below:
>>
>>      KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>>      KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T
>> into k-v pairs).keyBy(0);
>>
>>      ks1.connect(ks2).flatMap(X);
>>      //X is a CoFlatMapFunction that inserts and removes elements from
>> ks2 into a key-value state member. Elements from ks1 are matched against
>> that state.
>>
>>      //ks1 is streaming about 100K events/sec from kafka topic
>>      //ks2 is streaming about 1 event every 10 minutes... Precisely when
>> the 1st event is consumed from this stream, checkpoint takes 2 minutes
>> straightaway.
>>
>>     The version of flink is 1.1.2
>>
>>  Best Regards
>> CVP
>>
>> On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> you don't need the BlockedEventState class, you should be able to just
>>> do this:
>>>
>>> private transient ValueState<BlockedRoadInfo> blockedRoads;
>>>          ............
>>>       @Override
>>>     public void open(final org.apache.flink.configuration.Configuration
>>> parameters) throws Exception {
>>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>>         blockedRoads = getRuntimeContext().getState(blockedStateDesc);
>>>
>>>     };
>>>
>>>   }
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>> On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>>     I'm coding to check if your proposal works and hit with an issue
>>>> with ClassCastException.
>>>>
>>>>
>>>>     // Here is my Value that has state information.....an
>>>> implementation of my value state... where the key is a Double value... on
>>>> connected stream ks2
>>>>
>>>>     public class BlockedEventState implements
>>>> ValueState<BlockedRoadInfo> {
>>>>
>>>>     public BlockedRoadInfo blockedRoad;
>>>>
>>>>     @Override
>>>>     public void clear() {
>>>>         blockedRoad = null;
>>>>
>>>>     }
>>>>
>>>>     @Override
>>>>     public BlockedRoadInfo value() throws IOException {
>>>>         return blockedRoad;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void update(final BlockedRoadInfo value) throws IOException {
>>>>         blockedRoad = value;
>>>>     }
>>>> }
>>>>
>>>>        //BlockedRoadInfo class...
>>>>         public class BlockedRoadInfo {
>>>>             long maxLink;
>>>>             long minLink;
>>>>             double blockedEventId;
>>>>     ....setters & ... getters
>>>> }
>>>>
>>>> /// new RichCoFlatMapFunction() {
>>>>
>>>> private transient BlockedEventState blockedRoads;
>>>>          ............
>>>>       @Override
>>>>     public void open(final org.apache.flink.configuration.Configuration
>>>> parameters) throws Exception {
>>>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>>>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>>>         blockedRoads = (BlockedEventState)
>>>> getRuntimeContext().getState(blockedStateDesc); * // FAILS HERE WITH
>>>> CLASSCAST*
>>>>
>>>>     };
>>>>
>>>>   }
>>>>
>>>>
>>>>
>>>>
>>>> *Caused by: java.lang.ClassCastException:
>>>> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
>>>> com.ericsson.components.aia.io
>>>> <http://com.ericsson.components.aia.io>t.volvo.state.BlockedEventState*
>>>>
>>>>
>>>>
>>>> *I have tried to set the state backend to both MemState and
>>>> FsState...streamEnv.setStateBackend(new
>>>> FsStateBackend("file:///tmp/flink/checkpoints"));*
>>>>
>>>>
>>>>
>>>> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Not sure if I got your requirements right, but would this work?
>>>>>
>>>>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>>>>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
>>>>> k-v pairs).keyBy(0);
>>>>>
>>>>> ks1.connect(ks2).flatMap(X)
>>>>>
>>>>> X is a CoFlatMapFunction that inserts and removes elements from ks2
>>>>> into a key-value state member. Elements from ks1 are matched against that
>>>>> state.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarth...@gmail.com>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>>      First of all thanks for all your prompt responses. With regards
>>>>>> to 2) Multiple looks ups, I have to clarify what I mean by that...
>>>>>>
>>>>>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>>>>>> each of the streaming elements into string mapped value...
>>>>>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source
>>>>>> stream, as you proposed.. xxx() should be my function() which splits the
>>>>>> string and generates key1:<value1>, key2:<value2>, key3:<value3>
>>>>>> ....keyN:<value4>
>>>>>>
>>>>>>      Now,
>>>>>>         I wish to map elementKeyStream with look ups within (key1,
>>>>>> key2...keyN) where key1, key2.. keyN and their respective values should 
>>>>>> be
>>>>>> available across the cluster...
>>>>>>
>>>>>> Thanks a million !
>>>>>> CVP
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> That depends.
>>>>>>> 1) Growing/Shrinking: This should work. New entries can always be
>>>>>>> inserted. In order to remove entries from the k-v-state you have to set 
>>>>>>> the
>>>>>>> value to null. Note that you need an explicit delete-value record to
>>>>>>> trigger the eviction.
>>>>>>> 2) Multiple lookups: This does only work if all lookups are
>>>>>>> independent from each other. You can partition DS1 only on a single key 
>>>>>>> and
>>>>>>> the other keys might be located on different shards. A workaround might 
>>>>>>> be
>>>>>>> to duplicate S1 events for each key that they need to look up. However, 
>>>>>>> you
>>>>>>> might need to collect events from the same S1 event after the join. If 
>>>>>>> that
>>>>>>> does not work for you, the only thing that comes to my mind is to 
>>>>>>> broadcast
>>>>>>> the state and keep a full local copy in each operator.
>>>>>>>
>>>>>>> Let me add one more thing regarding the upcoming rescaling feature.
>>>>>>> If this is interesting for you, rescaling will also work (maybe not in 
>>>>>>> the
>>>>>>> first version) for broadcasted state, i.e. state which is the same on 
>>>>>>> all
>>>>>>> parallel operator instances.
>>>>>>>
>>>>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <
>>>>>>> chakravarth...@gmail.com>:
>>>>>>>
>>>>>>>> I'm understanding this better with your explanation..
>>>>>>>> With this use case,    each element in DS1 has to look up against a
>>>>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the 
>>>>>>>> no.,
>>>>>>>> of keys.... will the key-value shard work in this case?
>>>>>>>>
>>>>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Operator state is always local in Flink. However, with key-value
>>>>>>>>> state, you can have something which behaves kind of similar to a 
>>>>>>>>> distribute
>>>>>>>>> hashmap, because each operator holds a different shard/partition of 
>>>>>>>>> the
>>>>>>>>> hashtable.
>>>>>>>>>
>>>>>>>>> If you have to do only a single key lookup for each element of
>>>>>>>>> DS1, you should think about partitioning both streams (keyBy) and 
>>>>>>>>> writing
>>>>>>>>> the state into Flink's key-value state [1].
>>>>>>>>>
>>>>>>>>> This will have several benefits:
>>>>>>>>> 1) State does not need to be replicated
>>>>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>>>>>> 3) Flink takes care of the look-up. No need to have your own
>>>>>>>>> hashmap.
>>>>>>>>> 4) It will only be possible to rescale jobs with key-value state
>>>>>>>>> (this feature is currently under development).
>>>>>>>>>
>>>>>>>>> If using the key-value state is possible, I'd go for that.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>>>>> apis/streaming/state.html
>>>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>>>>> apis/streaming/state_backends.html
>>>>>>>>>
>>>>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> certainly, what I thought as well...
>>>>>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>>>>>> updates...
>>>>>>>>>> reading this topic from the other job, job1, is okie.
>>>>>>>>>> However, assuming that we maintain this state into a collection,
>>>>>>>>>> and updating the state (by reading from the topic) in this 
>>>>>>>>>> collection, will
>>>>>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>>>>>> other job an option?
>>>>>>>>>>>
>>>>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Fabian,
>>>>>>>>>>>>
>>>>>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink 
>>>>>>>>>>>> applications
>>>>>>>>>>>> running within the same cluster.
>>>>>>>>>>>>     DataStream2 (from Job2) applies transformations and updates
>>>>>>>>>>>> a 'cache' on which (Job1) needs to work on.
>>>>>>>>>>>>     Our intention is to not use the external key/value store as
>>>>>>>>>>>> we are trying to localize the cache within the cluster.
>>>>>>>>>>>>     Is there a way?
>>>>>>>>>>>>
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>> CVP
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <
>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Flink does not provide shared state.
>>>>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If that does not work for you because the state is too large
>>>>>>>>>>>>> and if it is possible to partition the state (and both streams), 
>>>>>>>>>>>>> you can
>>>>>>>>>>>>> also use keyBy instead of broadcast.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Finally, you can use an external system like a KeyValue Store
>>>>>>>>>>>>> or In-Memory store like Apache Ignite to hold your distributed 
>>>>>>>>>>>>> collection.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>> Varaga
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     I'm working on a Flink Streaming application. The data
>>>>>>>>>>>>>>> is injected through Kafka connectors. The payload volume is 
>>>>>>>>>>>>>>> roughly
>>>>>>>>>>>>>>> 100K/sec. The event payload is a string. Let's call this as 
>>>>>>>>>>>>>>> DataStream1.
>>>>>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements 
>>>>>>>>>>>>>>> of this
>>>>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally 
>>>>>>>>>>>>>>> updates a
>>>>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink 
>>>>>>>>>>>>>>> application should
>>>>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 
>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>> could check the state of the values in this collection. Is 
>>>>>>>>>>>>>>> there a way to
>>>>>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     I don't see any Shared Collection used within the
>>>>>>>>>>>>>>> cluster?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Reply via email to