Hello,

Can you please share if you have some example of CoGroupedStreams? Thanks!

> On 10-Feb-2021, at 3:22 PM, Kezhu Wang <kez...@gmail.com> wrote:
> 
> > Actually, my use case is that I want to share the state of one stream in 
> > two other streams. Right now, I can think of connecting this stream 
> > independently with each of the two other streams and manage the state 
> > twice, effectively duplicating it.
> 
> > Only the matching keys (with the two other streams) will do. 
> 
> I assume that `ConnectedStreams` meets your requirements but your don’t want 
> duplicate that state twice ? Then, I think there are ways:
> 1. Union all three streams to one and then keyBy. You can see 
> `CoGroupedStreams` for reference.
> 2. You can try `MultipleInputStreamOperator` and `AbstractStreamOperatorV2`. 
> But most usages of these two are currently Flink tests and internal.
>      You could reach out `MultipleInputITCase.testKeyedState` for reference.
> 
> 
> * CoGroupedStreams union: 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369
>  
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369>
> * MultipleInputITCase.testKeyedState: 
> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113
>  
> <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113>
> 
> On February 10, 2021 at 17:19:15, Sandeep khanzode (sand...@shiftright.ai 
> <mailto:sand...@shiftright.ai>) wrote:
> 
>> Hi,
>> 
>> Yes, but the stream, whose state I want to share, will be indefinite and 
>> have a large volume. Also, not all keys from that stream have to go to every 
>> Task Node. Only the matching keys (with the two other streams) will do. 
>>  
>> Please let me know if there is another cleaner way to achieve this. Thanks.
>> 
>> 
>>> On 10-Feb-2021, at 12:44 PM, Kezhu Wang <kez...@gmail.com 
>>> <mailto:kez...@gmail.com>> wrote:
>>> 
>>> Flink has broadcast state to broadcast one stream to other in case you are 
>>> not aware of it. It actually duplicates state.
>>> 
>>> 1. Broadcast state: 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html>
>>> 
>>> Best,
>>> Kezhu Wang
>>> 
>>> On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai 
>>> <mailto:sand...@shiftright.ai>) wrote:
>>> 
>>>> Hello,
>>>> 
>>>> Thanks a lot for the response. I will try to check Queryable-state for 
>>>> this purpose. 
>>>> 
>>>> Actually, my use case is that I want to share the state of one stream in 
>>>> two other streams. Right now, I can think of connecting this stream 
>>>> independently with each of the two other streams and manage the state 
>>>> twice, effectively duplicating it.
>>>> 
>>>> I was trying to check whether there are options where I can share this 
>>>> state with both the streams but save it only once.
>>>> 
>>>> 
>>>>> On 10-Feb-2021, at 9:05 AM, Kezhu Wang <kez...@gmail.com 
>>>>> <mailto:kez...@gmail.com>> wrote:
>>>>> 
>>>>> (a) It is by design. For keyed state, you can only access state for that 
>>>>> key, not others. If you want one value per key, ValueState fits more 
>>>>> appropriate that MapState.
>>>>> (b) state-processor-api aims to access/create/modify/upgrade offline 
>>>>> savepoint but not running state. Queryable state may meet your 
>>>>> requirement, but it is not actively developed for a while according to my 
>>>>> observation and still beta. 
>>>>> 
>>>>> Queryable state: 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html>
>>>>> 
>>>>> On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai 
>>>>> <mailto:sand...@shiftright.ai>) wrote:
>>>>> 
>>>>>> Hello,
>>>>>> 
>>>>>> I am creating a class that extends RichCoFlatMapFunction. I need to 
>>>>>> connect() two streams to basically share the state of one stream in 
>>>>>> another. 
>>>>>> 
>>>>>> This is what I do:
>>>>>> private transient MapState<KeyClass, ValueClass> state;
>>>>>> @Override
>>>>>> public void open(Configuration parameters) throws Exception {
>>>>>>     MapStateDescriptor<KeyClass, ValueClass> stateDescriptor =
>>>>>>             new MapStateDescriptor<>(“abc-saved-state",
>>>>>>                     Types.POJO(KeyClass.class), 
>>>>>> Types.POJO(ValueClass.class));
>>>>>>     state = getRuntimeContext().getMapState(stateDescriptor);
>>>>>> 
>>>>>> This works correctly.
>>>>>> 
>>>>>> 
>>>>>> I have two questions:
>>>>>> (a) Whenever I debug, I can only see the current key in the MapState, 
>>>>>> not all the possible keys that were created before and saved. Next time, 
>>>>>> I get a hit for another key, I will only see the other key and not the 
>>>>>> rest of previous keys. Is it by design or am I missing something?
>>>>>> 
>>>>>> (b) Can I somehow access this state beyond the class that holds the 
>>>>>> state? I.e. can I access the state in some other class? If not, can I 
>>>>>> use the 
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>>>>>  
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html>
>>>>>>  to do this? Is that the correct way to access the running state of one 
>>>>>> stream elsewhere in the program without corrupting it?
>>>>>> 
>>>>>> 
>>>>>> Your response will be greatly appreciated. I will be happy to add more 
>>>>>> details if required.
>>>>>> 
>>>>>> Thanks,
>>>>>> Sandeep Ramesh Khanzode

Reply via email to