Thank you very much, Fabian. Regards, Averell
On Fri, May 10, 2019 at 9:46 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Averell, > > I'd go with your approach any state access (given that you use RocksDB > keyed state) or deduplication of messages is going to be more expensive > than a simple cast. > > Best, Fabian > > Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan < > lvhu...@gmail.com>: > >> Hi Fabian, >> >> Thanks for that. However, as I mentioned in my previous email, that >> implementation requires a lot of typecasting/object wrapping. >> I tried to broadcast that Toggle stream - the toggles will be saved as a >> MapState, and whenever an export trigger record arrived, I send out that >> MapState. There's no use of applyToKeyedState in this implementation. >> And the problem I got is I received duplicated output (one from each >> parallelism-instance). >> >> Is there any option to modify the keyed state from within the >> processBroadcastElement() method? >> >> Thanks a lot for your help. >> >> Regards, >> Averell >> >> >> On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Averell, >>> >>> Ah, sorry. I had assumed the toggle events where broadcasted anyway. >>> Since you had both streams keyed, your current solution looks fine to me. >>> >>> Best, >>> Fabian >>> >>> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan < >>> lvhu...@gmail.com>: >>> >>>> Hi Fabian, >>>> >>>> Sorry, but I am still confused about your guide. If I union the Toggle >>>> stream with the StateReportTrigger stream, would that means I need to make >>>> my Toggles broadcasted states? Or there's some way to modify the keyed >>>> states from within the processBroadcastElement() method? >>>> >>>> I tried to implement the other direction (which I briefed in my >>>> previous email). It seems working, but I am not confident in that, not sure >>>> whether it has any flaws. Could you please give your comment? >>>> In my view, this implementation causes a lot of type-casting for my >>>> main data stream, which might cause a high impact on performance (my toggle >>>> is on in only about 1% of the keys, and the rate of input1.left is less >>>> than a millionth comparing to the rate of input1.right) >>>> >>>> /** >>>> * This KeyedBroadcastProcessFunction has: >>>> * input1: a keyed `DataStream[Either[Toggle, MyEvent]]`: >>>> * input1.left: Toggles in the form of a tuple (Key, Boolean). >>>> * When Toggle._2 == true, records from input1.right for the >>>> same key will be forwarded to the main output. >>>> * If it is false, records from input1.right for that same key >>>> will be dropped >>>> * input1.right: the main data stream >>>> * >>>> * input2: a broadcasted stream of StateReport triggers. When a record >>>> arrived on this stream, >>>> * the current value of Toggles will be sent out via the outputTag >>>> */ >>>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle]) >>>> extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], >>>> Any, MyEvent] { >>>> >>>> val toggleStateDescriptor = new >>>> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean]) >>>> >>>> override def processElement(in1: Either[Toggle, MyEvent], >>>> readOnlyContext: >>>> KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, >>>> MyEvent]#ReadOnlyContext, >>>> collector: Collector[MyEvent]): Unit = { >>>> in1 match { >>>> case Left(toggle) => >>>> >>>> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2) >>>> case Right(event) => >>>> if (getRuntimeContext.getState(toggleStateDescriptor).value()) >>>> collector.collect(event) >>>> } >>>> } >>>> >>>> override def processBroadcastElement(in2: Any, >>>> context: KeyedBroadcastProcessFunction[Key, >>>> Either[Toggle, MyEvent], Any, MyEvent]#Context, >>>> collector: Collector[MyEvent]): Unit = { >>>> context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: >>>> ValueState[Boolean]) => >>>> if (s != null) context.output(outputTag, (k, s.value()))) >>>> } >>>> } >>>> >>>> Thanks for your help. >>>> Regards, >>>> Averell >>>> >>>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Passing a Context through a DataStream definitely does not work. >>>>> You'd need to have the keyed state that you want to scan over in the >>>>> KeyedBroadcastProcessFunction. >>>>> >>>>> For the toggle filter use case, you would need to have a unioned >>>>> stream with Toggle and StateReport events. >>>>> For the output, you can use side outputs to route the different >>>>> outputs to different streams. >>>>> >>>>> Best, Fabian >>>>> >>>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>: >>>>> >>>>>> Thank you Congxian and Fabian. >>>>>> >>>>>> @Fabian: could you please give a bit more details? My understanding >>>>>> is: to >>>>>> pass the context itself and an OutputTag to the KeyedStateFunction >>>>>> parameter >>>>>> of KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and >>>>>> from >>>>>> within that KeyedStateFunction.process() send out the side output. Am >>>>>> I >>>>>> understand your idea correctly? >>>>>> >>>>>> BTW, I have another question regarding KeyedBroadcastProcessFunction >>>>>> best >>>>>> practice: I am having two streams: Data and Toggle. The stream Toggle >>>>>> is >>>>>> just a keyed boolean stream, being used to filter data from the >>>>>> stream Data. >>>>>> And I am implementing that filter using a simple >>>>>> RichCoFlatMapFunction. >>>>>> >>>>>> Now that I want to export the list of keys which are currently >>>>>> toggled on. >>>>>> Should I >>>>>> (1) have one additional KeyedBroadcastProcessFunction operator (which >>>>>> has >>>>>> Toggle and BroadCast as the input streams), or >>>>>> (2) replace that RichCoFlatMapFunction with a new >>>>>> KeyedBroadcastProcessFunction, which has both functionalities: filter >>>>>> and >>>>>> export? Doing this would require unioning Toggle and Data into one >>>>>> single >>>>>> keyed stream. >>>>>> >>>>>> Thanks and best regards, >>>>>> Averell >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>> >>>>>