Hi Averell,

I'd go with your approach any state access (given that you use RocksDB
keyed state) or deduplication of messages is going to be more expensive
than a simple cast.

Best, Fabian

Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <
lvhu...@gmail.com>:

> Hi Fabian,
>
> Thanks for that. However, as I mentioned in my previous email, that
> implementation requires a lot of typecasting/object wrapping.
> I tried to broadcast that Toggle stream - the toggles will be saved as a
> MapState, and whenever an export trigger record arrived, I send out that
> MapState. There's no use of applyToKeyedState in this implementation. And
> the problem I got is I received duplicated output (one from each
> parallelism-instance).
>
> Is there any option to modify the keyed state from within the
> processBroadcastElement() method?
>
> Thanks a lot for your help.
>
> Regards,
> Averell
>
>
> On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Averell,
>>
>> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
>> Since you had both streams keyed, your current solution looks fine to me.
>>
>> Best,
>> Fabian
>>
>> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
>> lvhu...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Sorry, but I am still confused about your guide. If I union the Toggle
>>> stream with the StateReportTrigger stream, would that means I need to make
>>> my Toggles broadcasted states? Or there's some way to modify the keyed
>>> states from within the processBroadcastElement() method?
>>>
>>> I tried to implement the other direction (which I briefed in my previous
>>> email). It seems working, but I am not confident in that, not sure whether
>>> it has any flaws. Could you please give your comment?
>>> In my view, this implementation causes a lot of type-casting for my main
>>> data stream, which might cause a high impact on performance (my toggle is
>>> on in only about 1% of the keys, and the rate of input1.left is less than a
>>> millionth comparing to the rate of input1.right)
>>>
>>> /**
>>>   * This KeyedBroadcastProcessFunction has:
>>>   *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>>   *       input1.left: Toggles in the form of a tuple (Key, Boolean).
>>>   *          When Toggle._2 == true, records from input1.right for the same 
>>> key will be forwarded to the main output.
>>>   *          If it is false, records from input1.right for that same key 
>>> will be dropped
>>>   *       input1.right: the main data stream
>>>   *
>>>   *    input2: a broadcasted stream of StateReport triggers. When a record 
>>> arrived on this stream,
>>>   *       the current value of Toggles will be sent out via the outputTag
>>>   */
>>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>>       extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
>>> Any, MyEvent] {
>>>
>>>    val toggleStateDescriptor = new 
>>> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>>
>>>    override def processElement(in1: Either[Toggle, MyEvent],
>>>                         readOnlyContext: KeyedBroadcastProcessFunction[Key, 
>>> Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>>>                         collector: Collector[MyEvent]): Unit = {
>>>       in1 match {
>>>          case Left(toggle) =>
>>>             
>>> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>>          case Right(event) =>
>>>             if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>>                collector.collect(event)
>>>       }
>>>    }
>>>
>>>    override def processBroadcastElement(in2: Any,
>>>                                context: KeyedBroadcastProcessFunction[Key, 
>>> Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>>                                collector: Collector[MyEvent]): Unit = {
>>>       context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: 
>>> ValueState[Boolean]) =>
>>>          if (s != null) context.output(outputTag, (k, s.value())))
>>>    }
>>> }
>>>
>>> Thanks for your help.
>>> Regards,
>>> Averell
>>>
>>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Passing a Context through a DataStream definitely does not work.
>>>> You'd need to have the keyed state that you want to scan over in the
>>>> KeyedBroadcastProcessFunction.
>>>>
>>>> For the toggle filter use case, you would need to have a unioned stream
>>>> with Toggle and StateReport events.
>>>> For the output, you can use side outputs to route the different outputs
>>>> to different streams.
>>>>
>>>> Best, Fabian
>>>>
>>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>:
>>>>
>>>>> Thank you Congxian and Fabian.
>>>>>
>>>>> @Fabian: could you please give a bit more details? My understanding
>>>>> is: to
>>>>> pass the context itself and an OutputTag to the KeyedStateFunction
>>>>> parameter
>>>>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>>>>> within that KeyedStateFunction.process() send out the side output. Am I
>>>>> understand your idea correctly?
>>>>>
>>>>> BTW, I have another question regarding KeyedBroadcastProcessFunction
>>>>> best
>>>>> practice: I am having two streams: Data and Toggle. The stream Toggle
>>>>> is
>>>>> just a keyed boolean stream, being used to filter data from the stream
>>>>> Data.
>>>>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>>>>
>>>>> Now that I want to export the list of keys which are currently toggled
>>>>> on.
>>>>> Should I
>>>>> (1) have one additional KeyedBroadcastProcessFunction operator (which
>>>>> has
>>>>> Toggle and BroadCast as the input streams), or
>>>>> (2) replace that RichCoFlatMapFunction with a new
>>>>> KeyedBroadcastProcessFunction, which has both functionalities: filter
>>>>> and
>>>>> export? Doing this would require unioning Toggle and Data into one
>>>>> single
>>>>> keyed stream.
>>>>>
>>>>> Thanks and best regards,
>>>>> Averell
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>

Reply via email to