Hi Averell,

Ah, sorry. I had assumed the toggle events where broadcasted anyway.
Since you had both streams keyed, your current solution looks fine to me.

Best,
Fabian

Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
lvhu...@gmail.com>:

> Hi Fabian,
>
> Sorry, but I am still confused about your guide. If I union the Toggle
> stream with the StateReportTrigger stream, would that means I need to make
> my Toggles broadcasted states? Or there's some way to modify the keyed
> states from within the processBroadcastElement() method?
>
> I tried to implement the other direction (which I briefed in my previous
> email). It seems working, but I am not confident in that, not sure whether
> it has any flaws. Could you please give your comment?
> In my view, this implementation causes a lot of type-casting for my main
> data stream, which might cause a high impact on performance (my toggle is
> on in only about 1% of the keys, and the rate of input1.left is less than a
> millionth comparing to the rate of input1.right)
>
> /**
>   * This KeyedBroadcastProcessFunction has:
>   *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>   *       input1.left: Toggles in the form of a tuple (Key, Boolean).
>   *          When Toggle._2 == true, records from input1.right for the same 
> key will be forwarded to the main output.
>   *          If it is false, records from input1.right for that same key will 
> be dropped
>   *       input1.right: the main data stream
>   *
>   *    input2: a broadcasted stream of StateReport triggers. When a record 
> arrived on this stream,
>   *       the current value of Toggles will be sent out via the outputTag
>   */
> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>       extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
> Any, MyEvent] {
>
>    val toggleStateDescriptor = new 
> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>
>    override def processElement(in1: Either[Toggle, MyEvent],
>                         readOnlyContext: KeyedBroadcastProcessFunction[Key, 
> Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>                         collector: Collector[MyEvent]): Unit = {
>       in1 match {
>          case Left(toggle) =>
>             
> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>          case Right(event) =>
>             if (getRuntimeContext.getState(toggleStateDescriptor).value())
>                collector.collect(event)
>       }
>    }
>
>    override def processBroadcastElement(in2: Any,
>                                context: KeyedBroadcastProcessFunction[Key, 
> Either[Toggle, MyEvent], Any, MyEvent]#Context,
>                                collector: Collector[MyEvent]): Unit = {
>       context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: 
> ValueState[Boolean]) =>
>          if (s != null) context.output(outputTag, (k, s.value())))
>    }
> }
>
> Thanks for your help.
> Regards,
> Averell
>
> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> Passing a Context through a DataStream definitely does not work.
>> You'd need to have the keyed state that you want to scan over in the
>> KeyedBroadcastProcessFunction.
>>
>> For the toggle filter use case, you would need to have a unioned stream
>> with Toggle and StateReport events.
>> For the output, you can use side outputs to route the different outputs
>> to different streams.
>>
>> Best, Fabian
>>
>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>:
>>
>>> Thank you Congxian and Fabian.
>>>
>>> @Fabian: could you please give a bit more details? My understanding is:
>>> to
>>> pass the context itself and an OutputTag to the KeyedStateFunction
>>> parameter
>>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>>> within that KeyedStateFunction.process() send out the side output. Am I
>>> understand your idea correctly?
>>>
>>> BTW, I have another question regarding KeyedBroadcastProcessFunction best
>>> practice: I am having two streams: Data and Toggle. The stream Toggle is
>>> just a keyed boolean stream, being used to filter data from the stream
>>> Data.
>>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>>
>>> Now that I want to export the list of keys which are currently toggled
>>> on.
>>> Should I
>>> (1) have one additional KeyedBroadcastProcessFunction operator (which has
>>> Toggle and BroadCast as the input streams), or
>>> (2) replace that RichCoFlatMapFunction with a new
>>> KeyedBroadcastProcessFunction, which has both functionalities: filter and
>>> export? Doing this would require unioning Toggle and Data into one single
>>> keyed stream.
>>>
>>> Thanks and best regards,
>>> Averell
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Reply via email to