Hi Averell, Ah, sorry. I had assumed the toggle events where broadcasted anyway. Since you had both streams keyed, your current solution looks fine to me.
Best, Fabian Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan < lvhu...@gmail.com>: > Hi Fabian, > > Sorry, but I am still confused about your guide. If I union the Toggle > stream with the StateReportTrigger stream, would that means I need to make > my Toggles broadcasted states? Or there's some way to modify the keyed > states from within the processBroadcastElement() method? > > I tried to implement the other direction (which I briefed in my previous > email). It seems working, but I am not confident in that, not sure whether > it has any flaws. Could you please give your comment? > In my view, this implementation causes a lot of type-casting for my main > data stream, which might cause a high impact on performance (my toggle is > on in only about 1% of the keys, and the rate of input1.left is less than a > millionth comparing to the rate of input1.right) > > /** > * This KeyedBroadcastProcessFunction has: > * input1: a keyed `DataStream[Either[Toggle, MyEvent]]`: > * input1.left: Toggles in the form of a tuple (Key, Boolean). > * When Toggle._2 == true, records from input1.right for the same > key will be forwarded to the main output. > * If it is false, records from input1.right for that same key will > be dropped > * input1.right: the main data stream > * > * input2: a broadcasted stream of StateReport triggers. When a record > arrived on this stream, > * the current value of Toggles will be sent out via the outputTag > */ > class EventFilterAndExportToggles(outputTag: OutputTag[Toggle]) > extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], > Any, MyEvent] { > > val toggleStateDescriptor = new > ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean]) > > override def processElement(in1: Either[Toggle, MyEvent], > readOnlyContext: KeyedBroadcastProcessFunction[Key, > Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext, > collector: Collector[MyEvent]): Unit = { > in1 match { > case Left(toggle) => > > getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2) > case Right(event) => > if (getRuntimeContext.getState(toggleStateDescriptor).value()) > collector.collect(event) > } > } > > override def processBroadcastElement(in2: Any, > context: KeyedBroadcastProcessFunction[Key, > Either[Toggle, MyEvent], Any, MyEvent]#Context, > collector: Collector[MyEvent]): Unit = { > context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: > ValueState[Boolean]) => > if (s != null) context.output(outputTag, (k, s.value()))) > } > } > > Thanks for your help. > Regards, > Averell > > On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> Passing a Context through a DataStream definitely does not work. >> You'd need to have the keyed state that you want to scan over in the >> KeyedBroadcastProcessFunction. >> >> For the toggle filter use case, you would need to have a unioned stream >> with Toggle and StateReport events. >> For the output, you can use side outputs to route the different outputs >> to different streams. >> >> Best, Fabian >> >> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>: >> >>> Thank you Congxian and Fabian. >>> >>> @Fabian: could you please give a bit more details? My understanding is: >>> to >>> pass the context itself and an OutputTag to the KeyedStateFunction >>> parameter >>> of KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from >>> within that KeyedStateFunction.process() send out the side output. Am I >>> understand your idea correctly? >>> >>> BTW, I have another question regarding KeyedBroadcastProcessFunction best >>> practice: I am having two streams: Data and Toggle. The stream Toggle is >>> just a keyed boolean stream, being used to filter data from the stream >>> Data. >>> And I am implementing that filter using a simple RichCoFlatMapFunction. >>> >>> Now that I want to export the list of keys which are currently toggled >>> on. >>> Should I >>> (1) have one additional KeyedBroadcastProcessFunction operator (which has >>> Toggle and BroadCast as the input streams), or >>> (2) replace that RichCoFlatMapFunction with a new >>> KeyedBroadcastProcessFunction, which has both functionalities: filter and >>> export? Doing this would require unioning Toggle and Data into one single >>> keyed stream. >>> >>> Thanks and best regards, >>> Averell >>> >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>