Oh okay. Got it. I will check. Thanks.
> On 12-Feb-2021, at 3:14 PM, Kezhu Wang <kez...@gmail.com> wrote:
>
> Hi Sandeep,
>
> I must mislead you by inaccurate words. I did not mean using
> CoGroupedStreams, but only CoGroupedStreams.apply as reference for how to
> union streams together and keyBy them. This way you can have all three
> streams’ states in downstream without duplication.
>
> Best,
> Kezhu Wang
> On February 11, 2021 at 20:49:20, Sandeep khanzode (sand...@shiftright.ai
> <mailto:sand...@shiftright.ai>) wrote:
>
>> Hello,
>>
>> Can you please share if you have some example of CoGroupedStreams? Thanks!
>>
>>> On 10-Feb-2021, at 3:22 PM, Kezhu Wang <kez...@gmail.com
>>> <mailto:kez...@gmail.com>> wrote:
>>>
>>> > Actually, my use case is that I want to share the state of one stream in
>>> > two other streams. Right now, I can think of connecting this stream
>>> > independently with each of the two other streams and manage the state
>>> > twice, effectively duplicating it.
>>>
>>> > Only the matching keys (with the two other streams) will do.
>>>
>>> I assume that `ConnectedStreams` meets your requirements but your don’t
>>> want duplicate that state twice ? Then, I think there are ways:
>>> 1. Union all three streams to one and then keyBy. You can see
>>> `CoGroupedStreams` for reference.
>>> 2. You can try `MultipleInputStreamOperator` and
>>> `AbstractStreamOperatorV2`. But most usages of these two are currently
>>> Flink tests and internal.
>>> You could reach out `MultipleInputITCase.testKeyedState` for reference.
>>>
>>>
>>> * CoGroupedStreams union:
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369
>>>
>>> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369>
>>> * MultipleInputITCase.testKeyedState:
>>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113
>>>
>>> <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113>
>>>
>>> On February 10, 2021 at 17:19:15, Sandeep khanzode (sand...@shiftright.ai
>>> <mailto:sand...@shiftright.ai>) wrote:
>>>
>>>> Hi,
>>>>
>>>> Yes, but the stream, whose state I want to share, will be indefinite and
>>>> have a large volume. Also, not all keys from that stream have to go to
>>>> every Task Node. Only the matching keys (with the two other streams) will
>>>> do.
>>>>
>>>> Please let me know if there is another cleaner way to achieve this. Thanks.
>>>>
>>>>
>>>>> On 10-Feb-2021, at 12:44 PM, Kezhu Wang <kez...@gmail.com
>>>>> <mailto:kez...@gmail.com>> wrote:
>>>>>
>>>>> Flink has broadcast state to broadcast one stream to other in case you
>>>>> are not aware of it. It actually duplicates state.
>>>>>
>>>>> 1. Broadcast state:
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>>>
>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html>
>>>>>
>>>>> Best,
>>>>> Kezhu Wang
>>>>>
>>>>> On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai
>>>>> <mailto:sand...@shiftright.ai>) wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Thanks a lot for the response. I will try to check Queryable-state for
>>>>>> this purpose.
>>>>>>
>>>>>> Actually, my use case is that I want to share the state of one stream in
>>>>>> two other streams. Right now, I can think of connecting this stream
>>>>>> independently with each of the two other streams and manage the state
>>>>>> twice, effectively duplicating it.
>>>>>>
>>>>>> I was trying to check whether there are options where I can share this
>>>>>> state with both the streams but save it only once.
>>>>>>
>>>>>>
>>>>>>> On 10-Feb-2021, at 9:05 AM, Kezhu Wang <kez...@gmail.com
>>>>>>> <mailto:kez...@gmail.com>> wrote:
>>>>>>>
>>>>>>> (a) It is by design. For keyed state, you can only access state for
>>>>>>> that key, not others. If you want one value per key, ValueState fits
>>>>>>> more appropriate that MapState.
>>>>>>> (b) state-processor-api aims to access/create/modify/upgrade offline
>>>>>>> savepoint but not running state. Queryable state may meet your
>>>>>>> requirement, but it is not actively developed for a while according to
>>>>>>> my observation and still beta.
>>>>>>>
>>>>>>> Queryable state:
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>>>>>>>
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html>
>>>>>>>
>>>>>>> On February 9, 2021 at 22:09:29, Sandeep khanzode
>>>>>>> (sand...@shiftright.ai <mailto:sand...@shiftright.ai>) wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I am creating a class that extends RichCoFlatMapFunction. I need to
>>>>>>>> connect() two streams to basically share the state of one stream in
>>>>>>>> another.
>>>>>>>>
>>>>>>>> This is what I do:
>>>>>>>> private transient MapState<KeyClass, ValueClass> state;
>>>>>>>> @Override
>>>>>>>> public void open(Configuration parameters) throws Exception {
>>>>>>>> MapStateDescriptor<KeyClass, ValueClass> stateDescriptor =
>>>>>>>> new MapStateDescriptor<>(“abc-saved-state",
>>>>>>>> Types.POJO(KeyClass.class),
>>>>>>>> Types.POJO(ValueClass.class));
>>>>>>>> state = getRuntimeContext().getMapState(stateDescriptor);
>>>>>>>>
>>>>>>>> This works correctly.
>>>>>>>>
>>>>>>>>
>>>>>>>> I have two questions:
>>>>>>>> (a) Whenever I debug, I can only see the current key in the MapState,
>>>>>>>> not all the possible keys that were created before and saved. Next
>>>>>>>> time, I get a hit for another key, I will only see the other key and
>>>>>>>> not the rest of previous keys. Is it by design or am I missing
>>>>>>>> something?
>>>>>>>>
>>>>>>>> (b) Can I somehow access this state beyond the class that holds the
>>>>>>>> state? I.e. can I access the state in some other class? If not, can I
>>>>>>>> use the
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>>>>>>>
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html>
>>>>>>>> to do this? Is that the correct way to access the running state of
>>>>>>>> one stream elsewhere in the program without corrupting it?
>>>>>>>>
>>>>>>>>
>>>>>>>> Your response will be greatly appreciated. I will be happy to add more
>>>>>>>> details if required.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Sandeep Ramesh Khanzode