Sure! you get the context and the collector in the processBroadcastElement method see snippet below
override def processBroadcastElement(value: BroadcastRequest, ctx: KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, String]#Context, out: Collector[String]): Unit = { .... ctxctx.applyToKeyedState(stateDescriptor, new KeyedStateFunction[String, ValueState[String]] { override def process(key: String, state: ValueState[String]): Unit = Option(state.value()).foreach(s => out.collect(s)) }) ... } On Mon, Apr 29, 2019 at 5:45 PM M Singh <mans2si...@yahoo.com> wrote: > *This Message originated outside your organization.* > ------------------------------ > Hi Avi: > > Can you please elaborate (or include an example/code snippet) of how you > were able to achieve collecting the keyed states from the > processBroadcastElement method using the applyToKeyedState ? > > I am trying to understand which collector you used to emit the state since > the broadcasted elements/state might be different from the non-broadcast > elements/state. > > Thanks for your help. > > Mans > On Monday, April 29, 2019, 7:29:23 AM EDT, Fabian Hueske < > fhue...@gmail.com> wrote: > > > Nice! > Thanks for the confirmation :-) > > Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi < > avi.l...@bluevoyant.com>: > > Thanks! Works like a charm :) > > On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske <fhue...@gmail.com> wrote: > > *This Message originated outside your organization.* > ------------------------------ > Hi Avi, > > I'm not sure if you cannot emit data from the keyed state when you > receive a broadcasted message. > The Context parameter of the processBroadcastElement() method in the > KeyedBroadcastProcessFunction has the applyToKeyedState() method. > The method takes a KeyedStateFunction that is applied to each key of a > state, but does not provide a Collector to emit data. > Maybe you can pass the collector to the KeyedStateFunction and emit > records while it iterates over the key space. > > Best, Fabian > > Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi < > avi.l...@bluevoyant.com>: > > Hi Timo, > I defiantly did. but broadcasting a command and trying to address the > persisted state (I mean the state of the data stream and not the > broadcasted one) you get the exception that I wrote > (java.lang.NullPointerException: No key set. This method should not be > called outside of a keyed context). e.g doing something like > > override def processBroadcastElement(value: BroadcastRequest, ctx: > KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, > Response]#Context, out: Collector[Response]): Unit = { > value match { > case Command(StateCmd.Fetch, _) => > if (state.value() != null) { > ouout.collecy(state.value()) > } > > will yield that exception > > BR > Avi > > On Fri, Apr 26, 2019 at 11:55 AM Timo Walther <twal...@apache.org> wrote: > > This Message originated outside your organization. > > Hi Avi, > > did you have a look at the .connect() and .broadcast() API > functionalities? They allow you to broadcast a control stream to all > operators. Maybe this example [1] or other examples in this repository > can help you. > > Regards, > Timo > > [1] > > https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java > <https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java> > > Am 26.04.19 um 07:57 schrieb Avi Levi: > > Hi, > > We have a keyed pipeline with persisted state. > > Is there a way to broadcast a command and collect all values that > > persisted in the state ? > > > > The end result can be for example sending a fetch command to all > > operators and emitting the results to some sink > > > > why do we need it ? from time to time we might want to check if we are > > missing keys what are the additional keys or simply emit the current > > state to a table and to query it. > > > > I tried simply broadcasting a command and addressing the persisted > > state but that resulted with: > > java.lang.NullPointerException: No key set. This method should not be > > called outside of a keyed context. > > > > is there a good way to achieve that ? > > > > Cheers > > Avi > >