Thanks! Works like a charm :) On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske <fhue...@gmail.com> wrote:
> *This Message originated outside your organization.* > ------------------------------ > Hi Avi, > > I'm not sure if you cannot emit data from the keyed state when you > receive a broadcasted message. > The Context parameter of the processBroadcastElement() method in the > KeyedBroadcastProcessFunction has the applyToKeyedState() method. > The method takes a KeyedStateFunction that is applied to each key of a > state, but does not provide a Collector to emit data. > Maybe you can pass the collector to the KeyedStateFunction and emit > records while it iterates over the key space. > > Best, Fabian > > Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi < > avi.l...@bluevoyant.com>: > >> Hi Timo, >> I defiantly did. but broadcasting a command and trying to address the >> persisted state (I mean the state of the data stream and not the >> broadcasted one) you get the exception that I wrote >> (java.lang.NullPointerException: No key set. This method should not be >> called outside of a keyed context). e.g doing something like >> >> override def processBroadcastElement(value: BroadcastRequest, ctx: >> KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, >> Response]#Context, out: Collector[Response]): Unit = { >> value match { >> case Command(StateCmd.Fetch, _) => >> if (state.value() != null) { >> ouout.collecy(state.value()) >> } >> >> will yield that exception >> >> BR >> Avi >> >> On Fri, Apr 26, 2019 at 11:55 AM Timo Walther <twal...@apache.org> wrote: >> >>> This Message originated outside your organization. >>> >>> Hi Avi, >>> >>> did you have a look at the .connect() and .broadcast() API >>> functionalities? They allow you to broadcast a control stream to all >>> operators. Maybe this example [1] or other examples in this repository >>> can help you. >>> >>> Regards, >>> Timo >>> >>> [1] >>> >>> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java >>> <https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java> >>> >>> Am 26.04.19 um 07:57 schrieb Avi Levi: >>> > Hi, >>> > We have a keyed pipeline with persisted state. >>> > Is there a way to broadcast a command and collect all values that >>> > persisted in the state ? >>> > >>> > The end result can be for example sending a fetch command to all >>> > operators and emitting the results to some sink >>> > >>> > why do we need it ? from time to time we might want to check if we are >>> > missing keys what are the additional keys or simply emit the current >>> > state to a table and to query it. >>> > >>> > I tried simply broadcasting a command and addressing the persisted >>> > state but that resulted with: >>> > java.lang.NullPointerException: No key set. This method should not be >>> > called outside of a keyed context. >>> > >>> > is there a good way to achieve that ? >>> > >>> > Cheers >>> > Avi >>> >>>