Hi Fabian,

Thanks for that. However, as I mentioned in my previous email, that
implementation requires a lot of typecasting/object wrapping.
I tried to broadcast that Toggle stream - the toggles will be saved as a
MapState, and whenever an export trigger record arrived, I send out that
MapState. There's no use of applyToKeyedState in this implementation. And
the problem I got is I received duplicated output (one from each
parallelism-instance).

Is there any option to modify the keyed state from within the
processBroadcastElement() method?

Thanks a lot for your help.

Regards,
Averell


On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Averell,
>
> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
> Since you had both streams keyed, your current solution looks fine to me.
>
> Best,
> Fabian
>
> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
> lvhu...@gmail.com>:
>
>> Hi Fabian,
>>
>> Sorry, but I am still confused about your guide. If I union the Toggle
>> stream with the StateReportTrigger stream, would that means I need to make
>> my Toggles broadcasted states? Or there's some way to modify the keyed
>> states from within the processBroadcastElement() method?
>>
>> I tried to implement the other direction (which I briefed in my previous
>> email). It seems working, but I am not confident in that, not sure whether
>> it has any flaws. Could you please give your comment?
>> In my view, this implementation causes a lot of type-casting for my main
>> data stream, which might cause a high impact on performance (my toggle is
>> on in only about 1% of the keys, and the rate of input1.left is less than a
>> millionth comparing to the rate of input1.right)
>>
>> /**
>>   * This KeyedBroadcastProcessFunction has:
>>   *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>   *       input1.left: Toggles in the form of a tuple (Key, Boolean).
>>   *          When Toggle._2 == true, records from input1.right for the same 
>> key will be forwarded to the main output.
>>   *          If it is false, records from input1.right for that same key 
>> will be dropped
>>   *       input1.right: the main data stream
>>   *
>>   *    input2: a broadcasted stream of StateReport triggers. When a record 
>> arrived on this stream,
>>   *       the current value of Toggles will be sent out via the outputTag
>>   */
>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>       extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
>> Any, MyEvent] {
>>
>>    val toggleStateDescriptor = new 
>> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>
>>    override def processElement(in1: Either[Toggle, MyEvent],
>>                         readOnlyContext: KeyedBroadcastProcessFunction[Key, 
>> Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>>                         collector: Collector[MyEvent]): Unit = {
>>       in1 match {
>>          case Left(toggle) =>
>>             
>> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>          case Right(event) =>
>>             if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>                collector.collect(event)
>>       }
>>    }
>>
>>    override def processBroadcastElement(in2: Any,
>>                                context: KeyedBroadcastProcessFunction[Key, 
>> Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>                                collector: Collector[MyEvent]): Unit = {
>>       context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: 
>> ValueState[Boolean]) =>
>>          if (s != null) context.output(outputTag, (k, s.value())))
>>    }
>> }
>>
>> Thanks for your help.
>> Regards,
>> Averell
>>
>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Passing a Context through a DataStream definitely does not work.
>>> You'd need to have the keyed state that you want to scan over in the
>>> KeyedBroadcastProcessFunction.
>>>
>>> For the toggle filter use case, you would need to have a unioned stream
>>> with Toggle and StateReport events.
>>> For the output, you can use side outputs to route the different outputs
>>> to different streams.
>>>
>>> Best, Fabian
>>>
>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>:
>>>
>>>> Thank you Congxian and Fabian.
>>>>
>>>> @Fabian: could you please give a bit more details? My understanding is:
>>>> to
>>>> pass the context itself and an OutputTag to the KeyedStateFunction
>>>> parameter
>>>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>>>> within that KeyedStateFunction.process() send out the side output. Am I
>>>> understand your idea correctly?
>>>>
>>>> BTW, I have another question regarding KeyedBroadcastProcessFunction
>>>> best
>>>> practice: I am having two streams: Data and Toggle. The stream Toggle is
>>>> just a keyed boolean stream, being used to filter data from the stream
>>>> Data.
>>>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>>>
>>>> Now that I want to export the list of keys which are currently toggled
>>>> on.
>>>> Should I
>>>> (1) have one additional KeyedBroadcastProcessFunction operator (which
>>>> has
>>>> Toggle and BroadCast as the input streams), or
>>>> (2) replace that RichCoFlatMapFunction with a new
>>>> KeyedBroadcastProcessFunction, which has both functionalities: filter
>>>> and
>>>> export? Doing this would require unioning Toggle and Data into one
>>>> single
>>>> keyed stream.
>>>>
>>>> Thanks and best regards,
>>>> Averell
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Reply via email to