Thank you very much, Fabian.

Regards,
Averell

On Fri, May 10, 2019 at 9:46 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Averell,
>
> I'd go with your approach any state access (given that you use RocksDB
> keyed state) or deduplication of messages is going to be more expensive
> than a simple cast.
>
> Best, Fabian
>
> Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <
> lvhu...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for that. However, as I mentioned in my previous email, that
>> implementation requires a lot of typecasting/object wrapping.
>> I tried to broadcast that Toggle stream - the toggles will be saved as a
>> MapState, and whenever an export trigger record arrived, I send out that
>> MapState. There's no use of applyToKeyedState in this implementation.
>> And the problem I got is I received duplicated output (one from each
>> parallelism-instance).
>>
>> Is there any option to modify the keyed state from within the
>> processBroadcastElement() method?
>>
>> Thanks a lot for your help.
>>
>> Regards,
>> Averell
>>
>>
>> On Fri, May 10, 2019 at 8:52 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Averell,
>>>
>>> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
>>> Since you had both streams keyed, your current solution looks fine to me.
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
>>> lvhu...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Sorry, but I am still confused about your guide. If I union the Toggle
>>>> stream with the StateReportTrigger stream, would that means I need to make
>>>> my Toggles broadcasted states? Or there's some way to modify the keyed
>>>> states from within the processBroadcastElement() method?
>>>>
>>>> I tried to implement the other direction (which I briefed in my
>>>> previous email). It seems working, but I am not confident in that, not sure
>>>> whether it has any flaws. Could you please give your comment?
>>>> In my view, this implementation causes a lot of type-casting for my
>>>> main data stream, which might cause a high impact on performance (my toggle
>>>> is on in only about 1% of the keys, and the rate of input1.left is less
>>>> than a millionth comparing to the rate of input1.right)
>>>>
>>>> /**
>>>>   * This KeyedBroadcastProcessFunction has:
>>>>   *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>>>   *       input1.left: Toggles in the form of a tuple (Key, Boolean).
>>>>   *          When Toggle._2 == true, records from input1.right for the 
>>>> same key will be forwarded to the main output.
>>>>   *          If it is false, records from input1.right for that same key 
>>>> will be dropped
>>>>   *       input1.right: the main data stream
>>>>   *
>>>>   *    input2: a broadcasted stream of StateReport triggers. When a record 
>>>> arrived on this stream,
>>>>   *       the current value of Toggles will be sent out via the outputTag
>>>>   */
>>>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>>>       extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
>>>> Any, MyEvent] {
>>>>
>>>>    val toggleStateDescriptor = new 
>>>> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>>>
>>>>    override def processElement(in1: Either[Toggle, MyEvent],
>>>>                         readOnlyContext: 
>>>> KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, 
>>>> MyEvent]#ReadOnlyContext,
>>>>                         collector: Collector[MyEvent]): Unit = {
>>>>       in1 match {
>>>>          case Left(toggle) =>
>>>>             
>>>> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>>>          case Right(event) =>
>>>>             if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>>>                collector.collect(event)
>>>>       }
>>>>    }
>>>>
>>>>    override def processBroadcastElement(in2: Any,
>>>>                                context: KeyedBroadcastProcessFunction[Key, 
>>>> Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>>>                                collector: Collector[MyEvent]): Unit = {
>>>>       context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: 
>>>> ValueState[Boolean]) =>
>>>>          if (s != null) context.output(outputTag, (k, s.value())))
>>>>    }
>>>> }
>>>>
>>>> Thanks for your help.
>>>> Regards,
>>>> Averell
>>>>
>>>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Passing a Context through a DataStream definitely does not work.
>>>>> You'd need to have the keyed state that you want to scan over in the
>>>>> KeyedBroadcastProcessFunction.
>>>>>
>>>>> For the toggle filter use case, you would need to have a unioned
>>>>> stream with Toggle and StateReport events.
>>>>> For the output, you can use side outputs to route the different
>>>>> outputs to different streams.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>:
>>>>>
>>>>>> Thank you Congxian and Fabian.
>>>>>>
>>>>>> @Fabian: could you please give a bit more details? My understanding
>>>>>> is: to
>>>>>> pass the context itself and an OutputTag to the KeyedStateFunction
>>>>>> parameter
>>>>>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and
>>>>>> from
>>>>>> within that KeyedStateFunction.process() send out the side output. Am
>>>>>> I
>>>>>> understand your idea correctly?
>>>>>>
>>>>>> BTW, I have another question regarding KeyedBroadcastProcessFunction
>>>>>> best
>>>>>> practice: I am having two streams: Data and Toggle. The stream Toggle
>>>>>> is
>>>>>> just a keyed boolean stream, being used to filter data from the
>>>>>> stream Data.
>>>>>> And I am implementing that filter using a simple
>>>>>> RichCoFlatMapFunction.
>>>>>>
>>>>>> Now that I want to export the list of keys which are currently
>>>>>> toggled on.
>>>>>> Should I
>>>>>> (1) have one additional KeyedBroadcastProcessFunction operator (which
>>>>>> has
>>>>>> Toggle and BroadCast as the input streams), or
>>>>>> (2) replace that RichCoFlatMapFunction with a new
>>>>>> KeyedBroadcastProcessFunction, which has both functionalities: filter
>>>>>> and
>>>>>> export? Doing this would require unioning Toggle and Data into one
>>>>>> single
>>>>>> keyed stream.
>>>>>>
>>>>>> Thanks and best regards,
>>>>>> Averell
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>

Reply via email to