Oh okay. Got it. I will check. Thanks.

> On 12-Feb-2021, at 3:14 PM, Kezhu Wang <kez...@gmail.com> wrote:
> 
> Hi Sandeep,
> 
> I must mislead you by inaccurate words. I did not mean using 
> CoGroupedStreams, but only CoGroupedStreams.apply as reference for how to 
> union streams together and keyBy them. This way you can have all three 
> streams’ states in downstream without duplication.
> 
> Best,
> Kezhu Wang
> On February 11, 2021 at 20:49:20, Sandeep khanzode (sand...@shiftright.ai 
> <mailto:sand...@shiftright.ai>) wrote:
> 
>> Hello,
>> 
>> Can you please share if you have some example of CoGroupedStreams? Thanks!
>> 
>>> On 10-Feb-2021, at 3:22 PM, Kezhu Wang <kez...@gmail.com 
>>> <mailto:kez...@gmail.com>> wrote:
>>> 
>>> > Actually, my use case is that I want to share the state of one stream in 
>>> > two other streams. Right now, I can think of connecting this stream 
>>> > independently with each of the two other streams and manage the state 
>>> > twice, effectively duplicating it.
>>> 
>>> > Only the matching keys (with the two other streams) will do. 
>>> 
>>> I assume that `ConnectedStreams` meets your requirements but your don’t 
>>> want duplicate that state twice ? Then, I think there are ways:
>>> 1. Union all three streams to one and then keyBy. You can see 
>>> `CoGroupedStreams` for reference.
>>> 2. You can try `MultipleInputStreamOperator` and 
>>> `AbstractStreamOperatorV2`. But most usages of these two are currently 
>>> Flink tests and internal.
>>>      You could reach out `MultipleInputITCase.testKeyedState` for reference.
>>> 
>>> 
>>> * CoGroupedStreams union: 
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369>
>>> * MultipleInputITCase.testKeyedState: 
>>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113>
>>> 
>>> On February 10, 2021 at 17:19:15, Sandeep khanzode (sand...@shiftright.ai 
>>> <mailto:sand...@shiftright.ai>) wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Yes, but the stream, whose state I want to share, will be indefinite and 
>>>> have a large volume. Also, not all keys from that stream have to go to 
>>>> every Task Node. Only the matching keys (with the two other streams) will 
>>>> do. 
>>>>  
>>>> Please let me know if there is another cleaner way to achieve this. Thanks.
>>>> 
>>>> 
>>>>> On 10-Feb-2021, at 12:44 PM, Kezhu Wang <kez...@gmail.com 
>>>>> <mailto:kez...@gmail.com>> wrote:
>>>>> 
>>>>> Flink has broadcast state to broadcast one stream to other in case you 
>>>>> are not aware of it. It actually duplicates state.
>>>>> 
>>>>> 1. Broadcast state: 
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html>
>>>>> 
>>>>> Best,
>>>>> Kezhu Wang
>>>>> 
>>>>> On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai 
>>>>> <mailto:sand...@shiftright.ai>) wrote:
>>>>> 
>>>>>> Hello,
>>>>>> 
>>>>>> Thanks a lot for the response. I will try to check Queryable-state for 
>>>>>> this purpose. 
>>>>>> 
>>>>>> Actually, my use case is that I want to share the state of one stream in 
>>>>>> two other streams. Right now, I can think of connecting this stream 
>>>>>> independently with each of the two other streams and manage the state 
>>>>>> twice, effectively duplicating it.
>>>>>> 
>>>>>> I was trying to check whether there are options where I can share this 
>>>>>> state with both the streams but save it only once.
>>>>>> 
>>>>>> 
>>>>>>> On 10-Feb-2021, at 9:05 AM, Kezhu Wang <kez...@gmail.com 
>>>>>>> <mailto:kez...@gmail.com>> wrote:
>>>>>>> 
>>>>>>> (a) It is by design. For keyed state, you can only access state for 
>>>>>>> that key, not others. If you want one value per key, ValueState fits 
>>>>>>> more appropriate that MapState.
>>>>>>> (b) state-processor-api aims to access/create/modify/upgrade offline 
>>>>>>> savepoint but not running state. Queryable state may meet your 
>>>>>>> requirement, but it is not actively developed for a while according to 
>>>>>>> my observation and still beta. 
>>>>>>> 
>>>>>>> Queryable state: 
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>>>>>>>  
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html>
>>>>>>> 
>>>>>>> On February 9, 2021 at 22:09:29, Sandeep khanzode 
>>>>>>> (sand...@shiftright.ai <mailto:sand...@shiftright.ai>) wrote:
>>>>>>> 
>>>>>>>> Hello,
>>>>>>>> 
>>>>>>>> I am creating a class that extends RichCoFlatMapFunction. I need to 
>>>>>>>> connect() two streams to basically share the state of one stream in 
>>>>>>>> another. 
>>>>>>>> 
>>>>>>>> This is what I do:
>>>>>>>> private transient MapState<KeyClass, ValueClass> state;
>>>>>>>> @Override
>>>>>>>> public void open(Configuration parameters) throws Exception {
>>>>>>>>     MapStateDescriptor<KeyClass, ValueClass> stateDescriptor =
>>>>>>>>             new MapStateDescriptor<>(“abc-saved-state",
>>>>>>>>                     Types.POJO(KeyClass.class), 
>>>>>>>> Types.POJO(ValueClass.class));
>>>>>>>>     state = getRuntimeContext().getMapState(stateDescriptor);
>>>>>>>> 
>>>>>>>> This works correctly.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> I have two questions:
>>>>>>>> (a) Whenever I debug, I can only see the current key in the MapState, 
>>>>>>>> not all the possible keys that were created before and saved. Next 
>>>>>>>> time, I get a hit for another key, I will only see the other key and 
>>>>>>>> not the rest of previous keys. Is it by design or am I missing 
>>>>>>>> something?
>>>>>>>> 
>>>>>>>> (b) Can I somehow access this state beyond the class that holds the 
>>>>>>>> state? I.e. can I access the state in some other class? If not, can I 
>>>>>>>> use the 
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>>>>>>>  
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html>
>>>>>>>>  to do this? Is that the correct way to access the running state of 
>>>>>>>> one stream elsewhere in the program without corrupting it?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Your response will be greatly appreciated. I will be happy to add more 
>>>>>>>> details if required.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Sandeep Ramesh Khanzode

Reply via email to