Hi Fabian, Thanks for that. However, as I mentioned in my previous email, that implementation requires a lot of typecasting/object wrapping. I tried to broadcast that Toggle stream - the toggles will be saved as a MapState, and whenever an export trigger record arrived, I send out that MapState. There's no use of applyToKeyedState in this implementation. And the problem I got is I received duplicated output (one from each parallelism-instance).
Is there any option to modify the keyed state from within the processBroadcastElement() method? Thanks a lot for your help. Regards, Averell On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Averell, > > Ah, sorry. I had assumed the toggle events where broadcasted anyway. > Since you had both streams keyed, your current solution looks fine to me. > > Best, > Fabian > > Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan < > lvhu...@gmail.com>: > >> Hi Fabian, >> >> Sorry, but I am still confused about your guide. If I union the Toggle >> stream with the StateReportTrigger stream, would that means I need to make >> my Toggles broadcasted states? Or there's some way to modify the keyed >> states from within the processBroadcastElement() method? >> >> I tried to implement the other direction (which I briefed in my previous >> email). It seems working, but I am not confident in that, not sure whether >> it has any flaws. Could you please give your comment? >> In my view, this implementation causes a lot of type-casting for my main >> data stream, which might cause a high impact on performance (my toggle is >> on in only about 1% of the keys, and the rate of input1.left is less than a >> millionth comparing to the rate of input1.right) >> >> /** >> * This KeyedBroadcastProcessFunction has: >> * input1: a keyed `DataStream[Either[Toggle, MyEvent]]`: >> * input1.left: Toggles in the form of a tuple (Key, Boolean). >> * When Toggle._2 == true, records from input1.right for the same >> key will be forwarded to the main output. >> * If it is false, records from input1.right for that same key >> will be dropped >> * input1.right: the main data stream >> * >> * input2: a broadcasted stream of StateReport triggers. When a record >> arrived on this stream, >> * the current value of Toggles will be sent out via the outputTag >> */ >> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle]) >> extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], >> Any, MyEvent] { >> >> val toggleStateDescriptor = new >> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean]) >> >> override def processElement(in1: Either[Toggle, MyEvent], >> readOnlyContext: KeyedBroadcastProcessFunction[Key, >> Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext, >> collector: Collector[MyEvent]): Unit = { >> in1 match { >> case Left(toggle) => >> >> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2) >> case Right(event) => >> if (getRuntimeContext.getState(toggleStateDescriptor).value()) >> collector.collect(event) >> } >> } >> >> override def processBroadcastElement(in2: Any, >> context: KeyedBroadcastProcessFunction[Key, >> Either[Toggle, MyEvent], Any, MyEvent]#Context, >> collector: Collector[MyEvent]): Unit = { >> context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: >> ValueState[Boolean]) => >> if (s != null) context.output(outputTag, (k, s.value()))) >> } >> } >> >> Thanks for your help. >> Regards, >> Averell >> >> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> Passing a Context through a DataStream definitely does not work. >>> You'd need to have the keyed state that you want to scan over in the >>> KeyedBroadcastProcessFunction. >>> >>> For the toggle filter use case, you would need to have a unioned stream >>> with Toggle and StateReport events. >>> For the output, you can use side outputs to route the different outputs >>> to different streams. >>> >>> Best, Fabian >>> >>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>: >>> >>>> Thank you Congxian and Fabian. >>>> >>>> @Fabian: could you please give a bit more details? My understanding is: >>>> to >>>> pass the context itself and an OutputTag to the KeyedStateFunction >>>> parameter >>>> of KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from >>>> within that KeyedStateFunction.process() send out the side output. Am I >>>> understand your idea correctly? >>>> >>>> BTW, I have another question regarding KeyedBroadcastProcessFunction >>>> best >>>> practice: I am having two streams: Data and Toggle. The stream Toggle is >>>> just a keyed boolean stream, being used to filter data from the stream >>>> Data. >>>> And I am implementing that filter using a simple RichCoFlatMapFunction. >>>> >>>> Now that I want to export the list of keys which are currently toggled >>>> on. >>>> Should I >>>> (1) have one additional KeyedBroadcastProcessFunction operator (which >>>> has >>>> Toggle and BroadCast as the input streams), or >>>> (2) replace that RichCoFlatMapFunction with a new >>>> KeyedBroadcastProcessFunction, which has both functionalities: filter >>>> and >>>> export? Doing this would require unioning Toggle and Data into one >>>> single >>>> keyed stream. >>>> >>>> Thanks and best regards, >>>> Averell >>>> >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>