Confirmed. This worked!
Thanks!
Roger

On Fri, Mar 5, 2021 at 12:41 PM Roger <roger.l...@gmail.com> wrote:

> Hey David.
> Thank you very much for your response. This is making sense now. It was
> confusing because I was able to use the Broadcast stream prior to adding
> the second stream. However, now I realize that this part of the pipeline
> occurs after the windowing so I'm not affected the same way. This is
> definitely going to help fix my problem.
>
> On Fri, Mar 5, 2021 at 12:33 PM David Anderson <dander...@apache.org>
> wrote:
>
>> This is a watermarking issue. Whenever an operator has two or more input
>> streams, its watermark is the minimum of watermarks of the incoming
>> streams. In this case your broadcast stream doesn't have a watermark
>> generator, so it is preventing the watermarks from advancing. This in turn
>> is preventing the windows from being triggered.
>>
>> You should call assignTimestampsAndWatermarks on the broadcast stream. If
>> time is irrelevant for this stream, you could do something like this:
>>
>> public static class ConfigStreamAssigner implements 
>> AssignerWithPeriodicWatermarks<T> {
>>       @Nullable
>>       @Override
>>       public Watermark getCurrentWatermark() {
>>          return Watermark.MAX_WATERMARK;
>>       }
>>
>>       @Override
>>       public long extractTimestamp(T element, long previousElementTimestamp) 
>> {
>>          return 0;
>>       }
>> }
>>
>>
>> By setting the watermark for this stream to MAX_WATERMARK, you are
>> effectively removing this stream's watermarks from consideration.
>>
>> Regards,
>> David
>>
>> On Fri, Mar 5, 2021 at 5:48 PM Roger <roger.l...@gmail.com> wrote:
>>
>>> Hello.
>>> I am having an issue with a Flink 1.8 pipeline when trying to consume
>>> broadcast state across multiple operators.  I currently
>>> have a working pipeline that looks like the following:
>>>
>>> records
>>>     .assignTimestampsAndWatermarks(
>>>         new BoundedOutOfOrdernessGenerator(
>>>
>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")))))
>>>     .keyBy(new ApplicationNameKeySelector())
>>>     .window(
>>>         TumblingEventTimeWindows.of(
>>>
>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")))))
>>>     .aggregate(new Aggregator())
>>>     .connect(configurationBroadcastStream)
>>>     .process(excluder)
>>>     .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>>
>>> * records are a FlinkKafkaConsumer stream
>>> * configurationBroadcastStream is a FlinkKafkaConsumer
>>> * aggregator is an AggregateFunction
>>> * filter is a BroadcastProcessFunction
>>>
>>>
>>> I now have requirements to filter out transactions at the beginning of
>>> the pipeline using the same broadcast stream I am consuming towards the end
>>> of the pipeline. I updated the pipeline to look like this:
>>>
>>> records
>>>     .assignTimestampsAndWatermarks(
>>>         new BoundedOutOfOrdernessGenerator(
>>>
>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")))))
>>>     .connect(configurationBroadcastStream) **new**
>>>     .process(filter) **new**
>>>     .keyBy(new ApplicationNameKeySelector())
>>>     .window(
>>>         TumblingEventTimeWindows.of(
>>>
>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")))))
>>>     .aggregate(new Aggregator())
>>>     .connect(configurationBroadcastStream)
>>>     .process(excluder)
>>>     .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>>
>>> * records are a FlinkKafkaConsumer stream
>>> * configurationBroadcastStream is a FlinkKafkaConsumer
>>> * aggregator is an AggregateFunction
>>> * excluder is a BroadcastProcessFunction
>>>
>>> With this change, the aggregated records are not making it into the
>>> excluder process.
>>>
>>> 1. The aggregator add is working. I can see this in the logs.
>>> 2. The aggregator getResult is never called. This makes me think this is
>>> a window issue.
>>> 3. Both processBroadcastElement methods from the two broadcast functions
>>> are working and
>>>      retrieving the broadcasted state. I see this in logging.
>>> 4. The pipeline definitely worked prior to me adding in the second
>>> .connect and .process at the beginning of the pipeline.
>>> 5. I have considered creating a new record object from the new
>>> process(filter) that contains the config retrieved from the broadcast
>>> stream along with the transactions and passing that down the pipeline but
>>> that is really not desirable.
>>>
>>> Any ideas on what might be going on here?
>>>
>>> Thanks!
>>> Roger
>>>
>>>

Reply via email to