Hi Averell, I'd go with your approach any state access (given that you use RocksDB keyed state) or deduplication of messages is going to be more expensive than a simple cast.
Best, Fabian Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan < lvhu...@gmail.com>: > Hi Fabian, > > Thanks for that. However, as I mentioned in my previous email, that > implementation requires a lot of typecasting/object wrapping. > I tried to broadcast that Toggle stream - the toggles will be saved as a > MapState, and whenever an export trigger record arrived, I send out that > MapState. There's no use of applyToKeyedState in this implementation. And > the problem I got is I received duplicated output (one from each > parallelism-instance). > > Is there any option to modify the keyed state from within the > processBroadcastElement() method? > > Thanks a lot for your help. > > Regards, > Averell > > > On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Averell, >> >> Ah, sorry. I had assumed the toggle events where broadcasted anyway. >> Since you had both streams keyed, your current solution looks fine to me. >> >> Best, >> Fabian >> >> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan < >> lvhu...@gmail.com>: >> >>> Hi Fabian, >>> >>> Sorry, but I am still confused about your guide. If I union the Toggle >>> stream with the StateReportTrigger stream, would that means I need to make >>> my Toggles broadcasted states? Or there's some way to modify the keyed >>> states from within the processBroadcastElement() method? >>> >>> I tried to implement the other direction (which I briefed in my previous >>> email). It seems working, but I am not confident in that, not sure whether >>> it has any flaws. Could you please give your comment? >>> In my view, this implementation causes a lot of type-casting for my main >>> data stream, which might cause a high impact on performance (my toggle is >>> on in only about 1% of the keys, and the rate of input1.left is less than a >>> millionth comparing to the rate of input1.right) >>> >>> /** >>> * This KeyedBroadcastProcessFunction has: >>> * input1: a keyed `DataStream[Either[Toggle, MyEvent]]`: >>> * input1.left: Toggles in the form of a tuple (Key, Boolean). >>> * When Toggle._2 == true, records from input1.right for the same >>> key will be forwarded to the main output. >>> * If it is false, records from input1.right for that same key >>> will be dropped >>> * input1.right: the main data stream >>> * >>> * input2: a broadcasted stream of StateReport triggers. When a record >>> arrived on this stream, >>> * the current value of Toggles will be sent out via the outputTag >>> */ >>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle]) >>> extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], >>> Any, MyEvent] { >>> >>> val toggleStateDescriptor = new >>> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean]) >>> >>> override def processElement(in1: Either[Toggle, MyEvent], >>> readOnlyContext: KeyedBroadcastProcessFunction[Key, >>> Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext, >>> collector: Collector[MyEvent]): Unit = { >>> in1 match { >>> case Left(toggle) => >>> >>> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2) >>> case Right(event) => >>> if (getRuntimeContext.getState(toggleStateDescriptor).value()) >>> collector.collect(event) >>> } >>> } >>> >>> override def processBroadcastElement(in2: Any, >>> context: KeyedBroadcastProcessFunction[Key, >>> Either[Toggle, MyEvent], Any, MyEvent]#Context, >>> collector: Collector[MyEvent]): Unit = { >>> context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: >>> ValueState[Boolean]) => >>> if (s != null) context.output(outputTag, (k, s.value()))) >>> } >>> } >>> >>> Thanks for your help. >>> Regards, >>> Averell >>> >>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Passing a Context through a DataStream definitely does not work. >>>> You'd need to have the keyed state that you want to scan over in the >>>> KeyedBroadcastProcessFunction. >>>> >>>> For the toggle filter use case, you would need to have a unioned stream >>>> with Toggle and StateReport events. >>>> For the output, you can use side outputs to route the different outputs >>>> to different streams. >>>> >>>> Best, Fabian >>>> >>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>: >>>> >>>>> Thank you Congxian and Fabian. >>>>> >>>>> @Fabian: could you please give a bit more details? My understanding >>>>> is: to >>>>> pass the context itself and an OutputTag to the KeyedStateFunction >>>>> parameter >>>>> of KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from >>>>> within that KeyedStateFunction.process() send out the side output. Am I >>>>> understand your idea correctly? >>>>> >>>>> BTW, I have another question regarding KeyedBroadcastProcessFunction >>>>> best >>>>> practice: I am having two streams: Data and Toggle. The stream Toggle >>>>> is >>>>> just a keyed boolean stream, being used to filter data from the stream >>>>> Data. >>>>> And I am implementing that filter using a simple RichCoFlatMapFunction. >>>>> >>>>> Now that I want to export the list of keys which are currently toggled >>>>> on. >>>>> Should I >>>>> (1) have one additional KeyedBroadcastProcessFunction operator (which >>>>> has >>>>> Toggle and BroadCast as the input streams), or >>>>> (2) replace that RichCoFlatMapFunction with a new >>>>> KeyedBroadcastProcessFunction, which has both functionalities: filter >>>>> and >>>>> export? Doing this would require unioning Toggle and Data into one >>>>> single >>>>> keyed stream. >>>>> >>>>> Thanks and best regards, >>>>> Averell >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Sent from: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>> >>>>