Glad to hear it! Thanks for letting us know.

David

On Fri, Mar 5, 2021 at 10:22 PM Roger <roger.l...@gmail.com> wrote:

> Confirmed. This worked!
> Thanks!
> Roger
>
> On Fri, Mar 5, 2021 at 12:41 PM Roger <roger.l...@gmail.com> wrote:
>
>> Hey David.
>> Thank you very much for your response. This is making sense now. It was
>> confusing because I was able to use the Broadcast stream prior to adding
>> the second stream. However, now I realize that this part of the pipeline
>> occurs after the windowing so I'm not affected the same way. This is
>> definitely going to help fix my problem.
>>
>> On Fri, Mar 5, 2021 at 12:33 PM David Anderson <dander...@apache.org>
>> wrote:
>>
>>> This is a watermarking issue. Whenever an operator has two or more input
>>> streams, its watermark is the minimum of watermarks of the incoming
>>> streams. In this case your broadcast stream doesn't have a watermark
>>> generator, so it is preventing the watermarks from advancing. This in turn
>>> is preventing the windows from being triggered.
>>>
>>> You should call assignTimestampsAndWatermarks on the broadcast stream.
>>> If time is irrelevant for this stream, you could do something like this:
>>>
>>> public static class ConfigStreamAssigner implements 
>>> AssignerWithPeriodicWatermarks<T> {
>>>       @Nullable
>>>       @Override
>>>       public Watermark getCurrentWatermark() {
>>>          return Watermark.MAX_WATERMARK;
>>>       }
>>>
>>>       @Override
>>>       public long extractTimestamp(T element, long 
>>> previousElementTimestamp) {
>>>          return 0;
>>>       }
>>> }
>>>
>>>
>>> By setting the watermark for this stream to MAX_WATERMARK, you are
>>> effectively removing this stream's watermarks from consideration.
>>>
>>> Regards,
>>> David
>>>
>>> On Fri, Mar 5, 2021 at 5:48 PM Roger <roger.l...@gmail.com> wrote:
>>>
>>>> Hello.
>>>> I am having an issue with a Flink 1.8 pipeline when trying to consume
>>>> broadcast state across multiple operators.  I currently
>>>> have a working pipeline that looks like the following:
>>>>
>>>> records
>>>>     .assignTimestampsAndWatermarks(
>>>>         new BoundedOutOfOrdernessGenerator(
>>>>
>>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")))))
>>>>     .keyBy(new ApplicationNameKeySelector())
>>>>     .window(
>>>>         TumblingEventTimeWindows.of(
>>>>
>>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")))))
>>>>     .aggregate(new Aggregator())
>>>>     .connect(configurationBroadcastStream)
>>>>     .process(excluder)
>>>>     .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>>>
>>>> * records are a FlinkKafkaConsumer stream
>>>> * configurationBroadcastStream is a FlinkKafkaConsumer
>>>> * aggregator is an AggregateFunction
>>>> * filter is a BroadcastProcessFunction
>>>>
>>>>
>>>> I now have requirements to filter out transactions at the beginning of
>>>> the pipeline using the same broadcast stream I am consuming towards the end
>>>> of the pipeline. I updated the pipeline to look like this:
>>>>
>>>> records
>>>>     .assignTimestampsAndWatermarks(
>>>>         new BoundedOutOfOrdernessGenerator(
>>>>
>>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")))))
>>>>     .connect(configurationBroadcastStream) **new**
>>>>     .process(filter) **new**
>>>>     .keyBy(new ApplicationNameKeySelector())
>>>>     .window(
>>>>         TumblingEventTimeWindows.of(
>>>>
>>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")))))
>>>>     .aggregate(new Aggregator())
>>>>     .connect(configurationBroadcastStream)
>>>>     .process(excluder)
>>>>     .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>>>
>>>> * records are a FlinkKafkaConsumer stream
>>>> * configurationBroadcastStream is a FlinkKafkaConsumer
>>>> * aggregator is an AggregateFunction
>>>> * excluder is a BroadcastProcessFunction
>>>>
>>>> With this change, the aggregated records are not making it into the
>>>> excluder process.
>>>>
>>>> 1. The aggregator add is working. I can see this in the logs.
>>>> 2. The aggregator getResult is never called. This makes me think this
>>>> is a window issue.
>>>> 3. Both processBroadcastElement methods from the two broadcast
>>>> functions are working and
>>>>      retrieving the broadcasted state. I see this in logging.
>>>> 4. The pipeline definitely worked prior to me adding in the second
>>>> .connect and .process at the beginning of the pipeline.
>>>> 5. I have considered creating a new record object from the new
>>>> process(filter) that contains the config retrieved from the broadcast
>>>> stream along with the transactions and passing that down the pipeline but
>>>> that is really not desirable.
>>>>
>>>> Any ideas on what might be going on here?
>>>>
>>>> Thanks!
>>>> Roger
>>>>
>>>>

Reply via email to