Sure!
you get the context and the collector in the processBroadcastElement method
see snippet below

  override def processBroadcastElement(value: BroadcastRequest, ctx:
KeyedBroadcastProcessFunction[String, Request, BroadcastRequest,
String]#Context, out: Collector[String]): Unit = {
    ....
    ctxctx.applyToKeyedState(stateDescriptor, new
KeyedStateFunction[String, ValueState[String]] {

  override def process(key: String, state: ValueState[String]): Unit =
Option(state.value()).foreach(s => out.collect(s))
  })

...
}


On Mon, Apr 29, 2019 at 5:45 PM M Singh <mans2si...@yahoo.com> wrote:

> *This Message originated outside your organization.*
> ------------------------------
> Hi Avi:
>
> Can you please elaborate (or include an example/code snippet) of how you
> were able to achieve collecting the keyed states from the
> processBroadcastElement method using the applyToKeyedState ?
>
> I am trying to understand which collector you used to emit the state since
> the broadcasted elements/state might be different from the non-broadcast
> elements/state.
>
> Thanks for your help.
>
> Mans
> On Monday, April 29, 2019, 7:29:23 AM EDT, Fabian Hueske <
> fhue...@gmail.com> wrote:
>
>
> Nice!
> Thanks for the confirmation :-)
>
> Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi <
> avi.l...@bluevoyant.com>:
>
> Thanks! Works like a charm :)
>
> On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
> *This Message originated outside your organization.*
> ------------------------------
> Hi Avi,
>
> I'm not sure if  you cannot emit data from the keyed state when you
> receive a broadcasted message.
> The Context parameter of the processBroadcastElement() method in the
> KeyedBroadcastProcessFunction has the applyToKeyedState() method.
> The method takes a KeyedStateFunction that is applied to each key of a
> state, but does not provide a Collector to emit data.
> Maybe you can pass the collector to the KeyedStateFunction and emit
> records while it iterates over the key space.
>
> Best, Fabian
>
> Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi <
> avi.l...@bluevoyant.com>:
>
> Hi Timo,
> I defiantly did. but broadcasting a command and trying to address the
> persisted state (I mean the state of the data stream and not the
> broadcasted one) you get the exception that I wrote
> (java.lang.NullPointerException: No key set. This method should not be
> called outside of a keyed context). e.g doing something like
>
> override def processBroadcastElement(value: BroadcastRequest, ctx: 
> KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
> Response]#Context, out: Collector[Response]): Unit = {
>   value match {
>     case Command(StateCmd.Fetch, _) =>
>       if (state.value() != null) {
>         ouout.collecy(state.value())
>       }
>
> will yield that exception
>
> BR
> Avi
>
> On Fri, Apr 26, 2019 at 11:55 AM Timo Walther <twal...@apache.org> wrote:
>
> This Message originated outside your organization.
>
> Hi Avi,
>
> did you have a look at the .connect() and .broadcast() API
> functionalities? They allow you to broadcast a control stream to all
> operators. Maybe this example [1] or other examples in this repository
> can help you.
>
> Regards,
> Timo
>
> [1]
>
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java
> <https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java>
>
> Am 26.04.19 um 07:57 schrieb Avi Levi:
> > Hi,
> > We have a keyed pipeline with persisted state.
> > Is there a way to broadcast a command and collect all values that
> > persisted in  the state ?
> >
> > The end result can be for example sending a fetch command to all
> > operators and emitting the results to some sink
> >
> > why do we need it ? from time to time we might want to check if we are
> > missing keys what are the additional keys or simply emit the current
> > state to a table and to query it.
> >
> > I tried simply broadcasting a command and addressing the persisted
> > state but that resulted with:
> > java.lang.NullPointerException: No key set. This method should not be
> > called outside of a keyed context.
> >
> > is there a good way to achieve that ?
> >
> > Cheers
> > Avi
>
>

Reply via email to