Hi Fabian,

Sorry, but I am still confused about your guide. If I union the Toggle
stream with the StateReportTrigger stream, would that means I need to make
my Toggles broadcasted states? Or there's some way to modify the keyed
states from within the processBroadcastElement() method?

I tried to implement the other direction (which I briefed in my previous
email). It seems working, but I am not confident in that, not sure whether
it has any flaws. Could you please give your comment?
In my view, this implementation causes a lot of type-casting for my main
data stream, which might cause a high impact on performance (my toggle is
on in only about 1% of the keys, and the rate of input1.left is less than a
millionth comparing to the rate of input1.right)

/**
  * This KeyedBroadcastProcessFunction has:
  *    input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
  *       input1.left: Toggles in the form of a tuple (Key, Boolean).
  *          When Toggle._2 == true, records from input1.right for the
same key will be forwarded to the main output.
  *          If it is false, records from input1.right for that same
key will be dropped
  *       input1.right: the main data stream
  *
  *    input2: a broadcasted stream of StateReport triggers. When a
record arrived on this stream,
  *       the current value of Toggles will be sent out via the outputTag
  */
class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
      extends KeyedBroadcastProcessFunction[Key, Either[Toggle,
MyEvent], Any, MyEvent] {

   val toggleStateDescriptor = new
ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

   override def processElement(in1: Either[Toggle, MyEvent],
                        readOnlyContext:
KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any,
MyEvent]#ReadOnlyContext,
                        collector: Collector[MyEvent]): Unit = {
      in1 match {
         case Left(toggle) =>
            getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
         case Right(event) =>
            if (getRuntimeContext.getState(toggleStateDescriptor).value())
               collector.collect(event)
      }
   }

   override def processBroadcastElement(in2: Any,
                               context:
KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any,
MyEvent]#Context,
                               collector: Collector[MyEvent]): Unit = {
      context.applyToKeyedState(toggleStateDescriptor, (k: Key, s:
ValueState[Boolean]) =>
         if (s != null) context.output(outputTag, (k, s.value())))
   }
}

Thanks for your help.
Regards,
Averell

On Thu, May 9, 2019 at 7:31 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> Passing a Context through a DataStream definitely does not work.
> You'd need to have the keyed state that you want to scan over in the
> KeyedBroadcastProcessFunction.
>
> For the toggle filter use case, you would need to have a unioned stream
> with Toggle and StateReport events.
> For the output, you can use side outputs to route the different outputs to
> different streams.
>
> Best, Fabian
>
> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell <lvhu...@gmail.com>:
>
>> Thank you Congxian and Fabian.
>>
>> @Fabian: could you please give a bit more details? My understanding is: to
>> pass the context itself and an OutputTag to the KeyedStateFunction
>> parameter
>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>> within that KeyedStateFunction.process() send out the side output. Am I
>> understand your idea correctly?
>>
>> BTW, I have another question regarding KeyedBroadcastProcessFunction best
>> practice: I am having two streams: Data and Toggle. The stream Toggle is
>> just a keyed boolean stream, being used to filter data from the stream
>> Data.
>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>
>> Now that I want to export the list of keys which are currently toggled on.
>> Should I
>> (1) have one additional KeyedBroadcastProcessFunction operator (which has
>> Toggle and BroadCast as the input streams), or
>> (2) replace that RichCoFlatMapFunction with a new
>> KeyedBroadcastProcessFunction, which has both functionalities: filter and
>> export? Doing this would require unioning Toggle and Data into one single
>> keyed stream.
>>
>> Thanks and best regards,
>> Averell
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Reply via email to