Glad to hear it! Thanks for letting us know. David
On Fri, Mar 5, 2021 at 10:22 PM Roger <roger.l...@gmail.com> wrote: > Confirmed. This worked! > Thanks! > Roger > > On Fri, Mar 5, 2021 at 12:41 PM Roger <roger.l...@gmail.com> wrote: > >> Hey David. >> Thank you very much for your response. This is making sense now. It was >> confusing because I was able to use the Broadcast stream prior to adding >> the second stream. However, now I realize that this part of the pipeline >> occurs after the windowing so I'm not affected the same way. This is >> definitely going to help fix my problem. >> >> On Fri, Mar 5, 2021 at 12:33 PM David Anderson <dander...@apache.org> >> wrote: >> >>> This is a watermarking issue. Whenever an operator has two or more input >>> streams, its watermark is the minimum of watermarks of the incoming >>> streams. In this case your broadcast stream doesn't have a watermark >>> generator, so it is preventing the watermarks from advancing. This in turn >>> is preventing the windows from being triggered. >>> >>> You should call assignTimestampsAndWatermarks on the broadcast stream. >>> If time is irrelevant for this stream, you could do something like this: >>> >>> public static class ConfigStreamAssigner implements >>> AssignerWithPeriodicWatermarks<T> { >>> @Nullable >>> @Override >>> public Watermark getCurrentWatermark() { >>> return Watermark.MAX_WATERMARK; >>> } >>> >>> @Override >>> public long extractTimestamp(T element, long >>> previousElementTimestamp) { >>> return 0; >>> } >>> } >>> >>> >>> By setting the watermark for this stream to MAX_WATERMARK, you are >>> effectively removing this stream's watermarks from consideration. >>> >>> Regards, >>> David >>> >>> On Fri, Mar 5, 2021 at 5:48 PM Roger <roger.l...@gmail.com> wrote: >>> >>>> Hello. >>>> I am having an issue with a Flink 1.8 pipeline when trying to consume >>>> broadcast state across multiple operators. I currently >>>> have a working pipeline that looks like the following: >>>> >>>> records >>>> .assignTimestampsAndWatermarks( >>>> new BoundedOutOfOrdernessGenerator( >>>> >>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness"))))) >>>> .keyBy(new ApplicationNameKeySelector()) >>>> .window( >>>> TumblingEventTimeWindows.of( >>>> >>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow"))))) >>>> .aggregate(new Aggregator()) >>>> .connect(configurationBroadcastStream) >>>> .process(excluder) >>>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties)); >>>> >>>> * records are a FlinkKafkaConsumer stream >>>> * configurationBroadcastStream is a FlinkKafkaConsumer >>>> * aggregator is an AggregateFunction >>>> * filter is a BroadcastProcessFunction >>>> >>>> >>>> I now have requirements to filter out transactions at the beginning of >>>> the pipeline using the same broadcast stream I am consuming towards the end >>>> of the pipeline. I updated the pipeline to look like this: >>>> >>>> records >>>> .assignTimestampsAndWatermarks( >>>> new BoundedOutOfOrdernessGenerator( >>>> >>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness"))))) >>>> .connect(configurationBroadcastStream) **new** >>>> .process(filter) **new** >>>> .keyBy(new ApplicationNameKeySelector()) >>>> .window( >>>> TumblingEventTimeWindows.of( >>>> >>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow"))))) >>>> .aggregate(new Aggregator()) >>>> .connect(configurationBroadcastStream) >>>> .process(excluder) >>>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties)); >>>> >>>> * records are a FlinkKafkaConsumer stream >>>> * configurationBroadcastStream is a FlinkKafkaConsumer >>>> * aggregator is an AggregateFunction >>>> * excluder is a BroadcastProcessFunction >>>> >>>> With this change, the aggregated records are not making it into the >>>> excluder process. >>>> >>>> 1. The aggregator add is working. I can see this in the logs. >>>> 2. The aggregator getResult is never called. This makes me think this >>>> is a window issue. >>>> 3. Both processBroadcastElement methods from the two broadcast >>>> functions are working and >>>> retrieving the broadcasted state. I see this in logging. >>>> 4. The pipeline definitely worked prior to me adding in the second >>>> .connect and .process at the beginning of the pipeline. >>>> 5. I have considered creating a new record object from the new >>>> process(filter) that contains the config retrieved from the broadcast >>>> stream along with the transactions and passing that down the pipeline but >>>> that is really not desirable. >>>> >>>> Any ideas on what might be going on here? >>>> >>>> Thanks! >>>> Roger >>>> >>>>