Hello, Can you please share if you have some example of CoGroupedStreams? Thanks!
> On 10-Feb-2021, at 3:22 PM, Kezhu Wang <kez...@gmail.com> wrote: > > > Actually, my use case is that I want to share the state of one stream in > > two other streams. Right now, I can think of connecting this stream > > independently with each of the two other streams and manage the state > > twice, effectively duplicating it. > > > Only the matching keys (with the two other streams) will do. > > I assume that `ConnectedStreams` meets your requirements but your don’t want > duplicate that state twice ? Then, I think there are ways: > 1. Union all three streams to one and then keyBy. You can see > `CoGroupedStreams` for reference. > 2. You can try `MultipleInputStreamOperator` and `AbstractStreamOperatorV2`. > But most usages of these two are currently Flink tests and internal. > You could reach out `MultipleInputITCase.testKeyedState` for reference. > > > * CoGroupedStreams union: > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369 > > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369> > * MultipleInputITCase.testKeyedState: > https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113 > > <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113> > > On February 10, 2021 at 17:19:15, Sandeep khanzode (sand...@shiftright.ai > <mailto:sand...@shiftright.ai>) wrote: > >> Hi, >> >> Yes, but the stream, whose state I want to share, will be indefinite and >> have a large volume. Also, not all keys from that stream have to go to every >> Task Node. Only the matching keys (with the two other streams) will do. >> >> Please let me know if there is another cleaner way to achieve this. Thanks. >> >> >>> On 10-Feb-2021, at 12:44 PM, Kezhu Wang <kez...@gmail.com >>> <mailto:kez...@gmail.com>> wrote: >>> >>> Flink has broadcast state to broadcast one stream to other in case you are >>> not aware of it. It actually duplicates state. >>> >>> 1. Broadcast state: >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>> >>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html> >>> >>> Best, >>> Kezhu Wang >>> >>> On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai >>> <mailto:sand...@shiftright.ai>) wrote: >>> >>>> Hello, >>>> >>>> Thanks a lot for the response. I will try to check Queryable-state for >>>> this purpose. >>>> >>>> Actually, my use case is that I want to share the state of one stream in >>>> two other streams. Right now, I can think of connecting this stream >>>> independently with each of the two other streams and manage the state >>>> twice, effectively duplicating it. >>>> >>>> I was trying to check whether there are options where I can share this >>>> state with both the streams but save it only once. >>>> >>>> >>>>> On 10-Feb-2021, at 9:05 AM, Kezhu Wang <kez...@gmail.com >>>>> <mailto:kez...@gmail.com>> wrote: >>>>> >>>>> (a) It is by design. For keyed state, you can only access state for that >>>>> key, not others. If you want one value per key, ValueState fits more >>>>> appropriate that MapState. >>>>> (b) state-processor-api aims to access/create/modify/upgrade offline >>>>> savepoint but not running state. Queryable state may meet your >>>>> requirement, but it is not actively developed for a while according to my >>>>> observation and still beta. >>>>> >>>>> Queryable state: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html >>>>> >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html> >>>>> >>>>> On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai >>>>> <mailto:sand...@shiftright.ai>) wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I am creating a class that extends RichCoFlatMapFunction. I need to >>>>>> connect() two streams to basically share the state of one stream in >>>>>> another. >>>>>> >>>>>> This is what I do: >>>>>> private transient MapState<KeyClass, ValueClass> state; >>>>>> @Override >>>>>> public void open(Configuration parameters) throws Exception { >>>>>> MapStateDescriptor<KeyClass, ValueClass> stateDescriptor = >>>>>> new MapStateDescriptor<>(“abc-saved-state", >>>>>> Types.POJO(KeyClass.class), >>>>>> Types.POJO(ValueClass.class)); >>>>>> state = getRuntimeContext().getMapState(stateDescriptor); >>>>>> >>>>>> This works correctly. >>>>>> >>>>>> >>>>>> I have two questions: >>>>>> (a) Whenever I debug, I can only see the current key in the MapState, >>>>>> not all the possible keys that were created before and saved. Next time, >>>>>> I get a hit for another key, I will only see the other key and not the >>>>>> rest of previous keys. Is it by design or am I missing something? >>>>>> >>>>>> (b) Can I somehow access this state beyond the class that holds the >>>>>> state? I.e. can I access the state in some other class? If not, can I >>>>>> use the >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html >>>>>> >>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html> >>>>>> to do this? Is that the correct way to access the running state of one >>>>>> stream elsewhere in the program without corrupting it? >>>>>> >>>>>> >>>>>> Your response will be greatly appreciated. I will be happy to add more >>>>>> details if required. >>>>>> >>>>>> Thanks, >>>>>> Sandeep Ramesh Khanzode