Hi Fabian, Sorry, but I am still confused about your guide. If I union the Toggle stream with the StateReportTrigger stream, would that means I need to make my Toggles broadcasted states? Or there's some way to modify the keyed states from within the processBroadcastElement() method?
I tried to implement the other direction (which I briefed in my previous email). It seems working, but I am not confident in that, not sure whether it has any flaws. Could you please give your comment? In my view, this implementation causes a lot of type-casting for my main data stream, which might cause a high impact on performance (my toggle is on in only about 1% of the keys, and the rate of input1.left is less than a millionth comparing to the rate of input1.right) /** * This KeyedBroadcastProcessFunction has: * input1: a keyed `DataStream[Either[Toggle, MyEvent]]`: * input1.left: Toggles in the form of a tuple (Key, Boolean). * When Toggle._2 == true, records from input1.right for the same key will be forwarded to the main output. * If it is false, records from input1.right for that same key will be dropped * input1.right: the main data stream * * input2: a broadcasted stream of StateReport triggers. When a record arrived on this stream, * the current value of Toggles will be sent out via the outputTag */ class EventFilterAndExportToggles(outputTag: OutputTag[Toggle]) extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent] { val toggleStateDescriptor = new ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean]) override def processElement(in1: Either[Toggle, MyEvent], readOnlyContext: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext, collector: Collector[MyEvent]): Unit = { in1 match { case Left(toggle) => getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2) case Right(event) => if (getRuntimeContext.getState(toggleStateDescriptor).value()) collector.collect(event) } } override def processBroadcastElement(in2: Any, context: KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, MyEvent]#Context, collector: Collector[MyEvent]): Unit = { context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: ValueState[Boolean]) => if (s != null) context.output(outputTag, (k, s.value()))) } } Thanks for your help. Regards, Averell On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > Passing a Context through a DataStream definitely does not work. > You'd need to have the keyed state that you want to scan over in the > KeyedBroadcastProcessFunction. > > For the toggle filter use case, you would need to have a unioned stream > with Toggle and StateReport events. > For the output, you can use side outputs to route the different outputs to > different streams. > > Best, Fabian > > Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>: > >> Thank you Congxian and Fabian. >> >> @Fabian: could you please give a bit more details? My understanding is: to >> pass the context itself and an OutputTag to the KeyedStateFunction >> parameter >> of KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from >> within that KeyedStateFunction.process() send out the side output. Am I >> understand your idea correctly? >> >> BTW, I have another question regarding KeyedBroadcastProcessFunction best >> practice: I am having two streams: Data and Toggle. The stream Toggle is >> just a keyed boolean stream, being used to filter data from the stream >> Data. >> And I am implementing that filter using a simple RichCoFlatMapFunction. >> >> Now that I want to export the list of keys which are currently toggled on. >> Should I >> (1) have one additional KeyedBroadcastProcessFunction operator (which has >> Toggle and BroadCast as the input streams), or >> (2) replace that RichCoFlatMapFunction with a new >> KeyedBroadcastProcessFunction, which has both functionalities: filter and >> export? Doing this would require unioning Toggle and Data into one single >> keyed stream. >> >> Thanks and best regards, >> Averell >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >