Confirmed. This worked! Thanks! Roger On Fri, Mar 5, 2021 at 12:41 PM Roger <roger.l...@gmail.com> wrote:
> Hey David. > Thank you very much for your response. This is making sense now. It was > confusing because I was able to use the Broadcast stream prior to adding > the second stream. However, now I realize that this part of the pipeline > occurs after the windowing so I'm not affected the same way. This is > definitely going to help fix my problem. > > On Fri, Mar 5, 2021 at 12:33 PM David Anderson <dander...@apache.org> > wrote: > >> This is a watermarking issue. Whenever an operator has two or more input >> streams, its watermark is the minimum of watermarks of the incoming >> streams. In this case your broadcast stream doesn't have a watermark >> generator, so it is preventing the watermarks from advancing. This in turn >> is preventing the windows from being triggered. >> >> You should call assignTimestampsAndWatermarks on the broadcast stream. If >> time is irrelevant for this stream, you could do something like this: >> >> public static class ConfigStreamAssigner implements >> AssignerWithPeriodicWatermarks<T> { >> @Nullable >> @Override >> public Watermark getCurrentWatermark() { >> return Watermark.MAX_WATERMARK; >> } >> >> @Override >> public long extractTimestamp(T element, long previousElementTimestamp) >> { >> return 0; >> } >> } >> >> >> By setting the watermark for this stream to MAX_WATERMARK, you are >> effectively removing this stream's watermarks from consideration. >> >> Regards, >> David >> >> On Fri, Mar 5, 2021 at 5:48 PM Roger <roger.l...@gmail.com> wrote: >> >>> Hello. >>> I am having an issue with a Flink 1.8 pipeline when trying to consume >>> broadcast state across multiple operators. I currently >>> have a working pipeline that looks like the following: >>> >>> records >>> .assignTimestampsAndWatermarks( >>> new BoundedOutOfOrdernessGenerator( >>> >>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness"))))) >>> .keyBy(new ApplicationNameKeySelector()) >>> .window( >>> TumblingEventTimeWindows.of( >>> >>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow"))))) >>> .aggregate(new Aggregator()) >>> .connect(configurationBroadcastStream) >>> .process(excluder) >>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties)); >>> >>> * records are a FlinkKafkaConsumer stream >>> * configurationBroadcastStream is a FlinkKafkaConsumer >>> * aggregator is an AggregateFunction >>> * filter is a BroadcastProcessFunction >>> >>> >>> I now have requirements to filter out transactions at the beginning of >>> the pipeline using the same broadcast stream I am consuming towards the end >>> of the pipeline. I updated the pipeline to look like this: >>> >>> records >>> .assignTimestampsAndWatermarks( >>> new BoundedOutOfOrdernessGenerator( >>> >>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness"))))) >>> .connect(configurationBroadcastStream) **new** >>> .process(filter) **new** >>> .keyBy(new ApplicationNameKeySelector()) >>> .window( >>> TumblingEventTimeWindows.of( >>> >>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow"))))) >>> .aggregate(new Aggregator()) >>> .connect(configurationBroadcastStream) >>> .process(excluder) >>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties)); >>> >>> * records are a FlinkKafkaConsumer stream >>> * configurationBroadcastStream is a FlinkKafkaConsumer >>> * aggregator is an AggregateFunction >>> * excluder is a BroadcastProcessFunction >>> >>> With this change, the aggregated records are not making it into the >>> excluder process. >>> >>> 1. The aggregator add is working. I can see this in the logs. >>> 2. The aggregator getResult is never called. This makes me think this is >>> a window issue. >>> 3. Both processBroadcastElement methods from the two broadcast functions >>> are working and >>> retrieving the broadcasted state. I see this in logging. >>> 4. The pipeline definitely worked prior to me adding in the second >>> .connect and .process at the beginning of the pipeline. >>> 5. I have considered creating a new record object from the new >>> process(filter) that contains the config retrieved from the broadcast >>> stream along with the transactions and passing that down the pipeline but >>> that is really not desirable. >>> >>> Any ideas on what might be going on here? >>> >>> Thanks! >>> Roger >>> >>>