Hi,
you don't need the BlockedEventState class, you should be able to just do
this:

private transient ValueState<BlockedRoadInfo> blockedRoads;
         ............
      @Override
    public void open(final org.apache.flink.configuration.Configuration
parameters) throws Exception {
        final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new
ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
                TypeInformation.of(BlockedRoadInfo.class), null);
        blockedRoads = getRuntimeContext().getState(blockedStateDesc);

    };

  }

Cheers,
Aljoscha


On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <chakravarth...@gmail.com>
wrote:

> Hi Fabian,
>
>     I'm coding to check if your proposal works and hit with an issue with
> ClassCastException.
>
>
>     // Here is my Value that has state information.....an implementation
> of my value state... where the key is a Double value... on connected stream
> ks2
>
>     public class BlockedEventState implements ValueState<BlockedRoadInfo> {
>
>     public BlockedRoadInfo blockedRoad;
>
>     @Override
>     public void clear() {
>         blockedRoad = null;
>
>     }
>
>     @Override
>     public BlockedRoadInfo value() throws IOException {
>         return blockedRoad;
>     }
>
>     @Override
>     public void update(final BlockedRoadInfo value) throws IOException {
>         blockedRoad = value;
>     }
> }
>
>        //BlockedRoadInfo class...
>         public class BlockedRoadInfo {
>             long maxLink;
>             long minLink;
>             double blockedEventId;
>     ....setters & ... getters
> }
>
> /// new RichCoFlatMapFunction() {
>
> private transient BlockedEventState blockedRoads;
>          ............
>       @Override
>     public void open(final org.apache.flink.configuration.Configuration
> parameters) throws Exception {
>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new
> ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>                 TypeInformation.of(BlockedRoadInfo.class), null);
>         blockedRoads = (BlockedEventState)
> getRuntimeContext().getState(blockedStateDesc); * // FAILS HERE WITH
> CLASSCAST*
>
>     };
>
>   }
>
>
>
>
> *Caused by: java.lang.ClassCastException:
> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
> com.ericsson.components.aia.iot.volvo.state.BlockedEventState*
>
>
>
> *I have tried to set the state backend to both MemState and
> FsState...streamEnv.setStateBackend(new
> FsStateBackend("file:///tmp/flink/checkpoints"));*
>
>
>
> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Not sure if I got your requirements right, but would this work?
>>
>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v
>> pairs).keyBy(0);
>>
>> ks1.connect(ks2).flatMap(X)
>>
>> X is a CoFlatMapFunction that inserts and removes elements from ks2 into
>> a key-value state member. Elements from ks1 are matched against that state.
>>
>> Best, Fabian
>>
>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com>
>> :
>>
>>> Hi Fabian,
>>>
>>>      First of all thanks for all your prompt responses. With regards to
>>> 2) Multiple looks ups, I have to clarify what I mean by that...
>>>
>>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>>> each of the streaming elements into string mapped value...
>>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source stream,
>>> as you proposed.. xxx() should be my function() which splits the string and
>>> generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4>
>>>
>>>      Now,
>>>         I wish to map elementKeyStream with look ups within (key1,
>>> key2...keyN) where key1, key2.. keyN and their respective values should be
>>> available across the cluster...
>>>
>>> Thanks a million !
>>> CVP
>>>
>>>
>>>
>>>
>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> That depends.
>>>> 1) Growing/Shrinking: This should work. New entries can always be
>>>> inserted. In order to remove entries from the k-v-state you have to set the
>>>> value to null. Note that you need an explicit delete-value record to
>>>> trigger the eviction.
>>>> 2) Multiple lookups: This does only work if all lookups are independent
>>>> from each other. You can partition DS1 only on a single key and the other
>>>> keys might be located on different shards. A workaround might be to
>>>> duplicate S1 events for each key that they need to look up. However, you
>>>> might need to collect events from the same S1 event after the join. If that
>>>> does not work for you, the only thing that comes to my mind is to broadcast
>>>> the state and keep a full local copy in each operator.
>>>>
>>>> Let me add one more thing regarding the upcoming rescaling feature. If
>>>> this is interesting for you, rescaling will also work (maybe not in the
>>>> first version) for broadcasted state, i.e. state which is the same on all
>>>> parallel operator instances.
>>>>
>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <
>>>> chakravarth...@gmail.com>:
>>>>
>>>>> I'm understanding this better with your explanation..
>>>>> With this use case,    each element in DS1 has to look up against a
>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>>>> of keys.... will the key-value shard work in this case?
>>>>>
>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Operator state is always local in Flink. However, with key-value
>>>>>> state, you can have something which behaves kind of similar to a 
>>>>>> distribute
>>>>>> hashmap, because each operator holds a different shard/partition of the
>>>>>> hashtable.
>>>>>>
>>>>>> If you have to do only a single key lookup for each element of DS1,
>>>>>> you should think about partitioning both streams (keyBy) and writing the
>>>>>> state into Flink's key-value state [1].
>>>>>>
>>>>>> This will have several benefits:
>>>>>> 1) State does not need to be replicated
>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>>>>> 4) It will only be possible to rescale jobs with key-value state
>>>>>> (this feature is currently under development).
>>>>>>
>>>>>> If using the key-value state is possible, I'd go for that.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state_backends.html
>>>>>>
>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>>> chakravarth...@gmail.com>:
>>>>>>
>>>>>>> certainly, what I thought as well...
>>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>>> updates...
>>>>>>> reading this topic from the other job, job1, is okie.
>>>>>>> However, assuming that we maintain this state into a collection, and
>>>>>>> updating the state (by reading from the topic) in this collection, will
>>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>>> other job an option?
>>>>>>>>
>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi Fabian,
>>>>>>>>>
>>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink 
>>>>>>>>> applications
>>>>>>>>> running within the same cluster.
>>>>>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>>>>>> 'cache' on which (Job1) needs to work on.
>>>>>>>>>     Our intention is to not use the external key/value store as we
>>>>>>>>> are trying to localize the cache within the cluster.
>>>>>>>>>     Is there a way?
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Flink does not provide shared state.
>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>>
>>>>>>>>>> If that does not work for you because the state is too large and
>>>>>>>>>> if it is possible to partition the state (and both streams), you can 
>>>>>>>>>> also
>>>>>>>>>> use keyBy instead of broadcast.
>>>>>>>>>>
>>>>>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>>>>>> In-Memory store like Apache Ignite to hold your distributed 
>>>>>>>>>> collection.
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi Team,
>>>>>>>>>>>
>>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>>
>>>>>>>>>>> Best Regards
>>>>>>>>>>> Varaga
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>
>>>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 
>>>>>>>>>>>> 100K/sec.
>>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of 
>>>>>>>>>>>> this
>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally 
>>>>>>>>>>>> updates a
>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application 
>>>>>>>>>>>> should
>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 
>>>>>>>>>>>> application
>>>>>>>>>>>> could check the state of the values in this collection. Is there a 
>>>>>>>>>>>> way to
>>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>>
>>>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>>>
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>> CVP
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to