Hi Fabian,

    I'm coding to check if your proposal works and hit with an issue with
ClassCastException.


    // Here is my Value that has state information.....an implementation of
my value state... where the key is a Double value... on connected stream ks2

    public class BlockedEventState implements ValueState<BlockedRoadInfo> {

    public BlockedRoadInfo blockedRoad;

    @Override
    public void clear() {
        blockedRoad = null;

    }

    @Override
    public BlockedRoadInfo value() throws IOException {
        return blockedRoad;
    }

    @Override
    public void update(final BlockedRoadInfo value) throws IOException {
        blockedRoad = value;
    }
}

       //BlockedRoadInfo class...
        public class BlockedRoadInfo {
            long maxLink;
            long minLink;
            double blockedEventId;
    ....setters & ... getters
}

/// new RichCoFlatMapFunction() {

private transient BlockedEventState blockedRoads;
         ............
      @Override
    public void open(final org.apache.flink.configuration.Configuration
parameters) throws Exception {
        final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = new
ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
                TypeInformation.of(BlockedRoadInfo.class), null);
        blockedRoads = (BlockedEventState)
getRuntimeContext().getState(blockedStateDesc); * // FAILS HERE WITH
CLASSCAST*

    };

  }




*Caused by: java.lang.ClassCastException:
org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
com.ericsson.components.aia.iot.volvo.state.BlockedEventState*



*I have tried to set the state backend to both MemState and
FsState...streamEnv.setStateBackend(new
FsStateBackend("file:///tmp/flink/checkpoints"));*



On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Not sure if I got your requirements right, but would this work?
>
> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v
> pairs).keyBy(0);
>
> ks1.connect(ks2).flatMap(X)
>
> X is a CoFlatMapFunction that inserts and removes elements from ks2 into a
> key-value state member. Elements from ks1 are matched against that state.
>
> Best, Fabian
>
> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com>:
>
>> Hi Fabian,
>>
>>      First of all thanks for all your prompt responses. With regards to
>> 2) Multiple looks ups, I have to clarify what I mean by that...
>>
>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>> each of the streaming elements into string mapped value...
>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source stream,
>> as you proposed.. xxx() should be my function() which splits the string and
>> generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4>
>>
>>      Now,
>>         I wish to map elementKeyStream with look ups within (key1,
>> key2...keyN) where key1, key2.. keyN and their respective values should be
>> available across the cluster...
>>
>> Thanks a million !
>> CVP
>>
>>
>>
>>
>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> That depends.
>>> 1) Growing/Shrinking: This should work. New entries can always be
>>> inserted. In order to remove entries from the k-v-state you have to set the
>>> value to null. Note that you need an explicit delete-value record to
>>> trigger the eviction.
>>> 2) Multiple lookups: This does only work if all lookups are independent
>>> from each other. You can partition DS1 only on a single key and the other
>>> keys might be located on different shards. A workaround might be to
>>> duplicate S1 events for each key that they need to look up. However, you
>>> might need to collect events from the same S1 event after the join. If that
>>> does not work for you, the only thing that comes to my mind is to broadcast
>>> the state and keep a full local copy in each operator.
>>>
>>> Let me add one more thing regarding the upcoming rescaling feature. If
>>> this is interesting for you, rescaling will also work (maybe not in the
>>> first version) for broadcasted state, i.e. state which is the same on all
>>> parallel operator instances.
>>>
>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com
>>> >:
>>>
>>>> I'm understanding this better with your explanation..
>>>> With this use case,    each element in DS1 has to look up against a
>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>>> of keys.... will the key-value shard work in this case?
>>>>
>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Operator state is always local in Flink. However, with key-value
>>>>> state, you can have something which behaves kind of similar to a 
>>>>> distribute
>>>>> hashmap, because each operator holds a different shard/partition of the
>>>>> hashtable.
>>>>>
>>>>> If you have to do only a single key lookup for each element of DS1,
>>>>> you should think about partitioning both streams (keyBy) and writing the
>>>>> state into Flink's key-value state [1].
>>>>>
>>>>> This will have several benefits:
>>>>> 1) State does not need to be replicated
>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>>>> 4) It will only be possible to rescale jobs with key-value state (this
>>>>> feature is currently under development).
>>>>>
>>>>> If using the key-value state is possible, I'd go for that.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>> apis/streaming/state.html
>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>> apis/streaming/state_backends.html
>>>>>
>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarth...@gmail.com>:
>>>>>
>>>>>> certainly, what I thought as well...
>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>> updates...
>>>>>> reading this topic from the other job, job1, is okie.
>>>>>> However, assuming that we maintain this state into a collection, and
>>>>>> updating the state (by reading from the topic) in this collection, will
>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>> other job an option?
>>>>>>>
>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>> chakravarth...@gmail.com>:
>>>>>>>
>>>>>>>> Hi Fabian,
>>>>>>>>
>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink 
>>>>>>>> applications
>>>>>>>> running within the same cluster.
>>>>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>>>>> 'cache' on which (Job1) needs to work on.
>>>>>>>>     Our intention is to not use the external key/value store as we
>>>>>>>> are trying to localize the cache within the cluster.
>>>>>>>>     Is there a way?
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Flink does not provide shared state.
>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>
>>>>>>>>> If that does not work for you because the state is too large and
>>>>>>>>> if it is possible to partition the state (and both streams), you can 
>>>>>>>>> also
>>>>>>>>> use keyBy instead of broadcast.
>>>>>>>>>
>>>>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>>>>> In-Memory store like Apache Ignite to hold your distributed 
>>>>>>>>> collection.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hi Team,
>>>>>>>>>>
>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>> Varaga
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Team,
>>>>>>>>>>>
>>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 
>>>>>>>>>>> 100K/sec.
>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of 
>>>>>>>>>>> this
>>>>>>>>>>> DataStream2 involves in a certain transformation that finally 
>>>>>>>>>>> updates a
>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application 
>>>>>>>>>>> should
>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 
>>>>>>>>>>> application
>>>>>>>>>>> could check the state of the values in this collection. Is there a 
>>>>>>>>>>> way to
>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>
>>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>>
>>>>>>>>>>> Best Regards
>>>>>>>>>>> CVP
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to