Hi, Could you explain a bit more what are you trying to achieve?
One problem that pops into my head is that currently in Flink Streaming (it is possible for processing bounded data), there is no way to “not ingest” the data reliably in general case, as this might deadlock the upstream operator once the output buffers will fill out. However instead, you can for example filter out/ignore records until some condition is met. BroadcastState works for one single operator (and it’s parallel instances) - it doesn’t automatically communicate with any upstream/downstream operators - you have to wire/connect your operators and distribute the information as you want to. For examples how does it work you can take a look at this ITCase for example [1]. What you could do, is create following job topology using side outputs [2]: Src1 -> OP1 -> broadcast_side_output | V Sink1 And use BroadcastProcessFunction to read Src1 and broadcast_side_output. Src1 + broadcast_side_output -> OP2 -> Sink2 But as I wrote before, you have to be careful in OP2. If both OP1 and OP2 are reading from the same data stream Src1, if you stop reading records from Src1 in OP2, you eventually deadlock Src1 itself. Solution for that, would be to create second instance of Src1 operator, that would read records from the external system second time: Src1" + broadcast_side_output -> OP2 -> Sink2 Piotrek [1] https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html> > On 12 Mar 2020, at 12:53, Mikael Gordani <mi.gord...@gmail.com> wrote: > > Hello everyone! > > So essentially, I've two identical queries (q1 and q2) running in parallel > (Streams). > I'm trying to activate the ingestion of data to q2 based on what is processed > in q1. > E.g say that we want to start ingesting data to q2 when a tuple with > timestamp > 5000 appears in q1. > > The queries are constructed in this way. (they share the same source) > q1: Source -> Filter -> Aggregate -> Filter -> Sink > | > V > q2: Filter -> Filter -> Aggregate -> Filter -> Sink > > The initial idea was to have a global variable which is shared between the > two queries. When this tuple appears in q1, it will set the variable to true > in the first Filter operator. While in q2, the first Filter-operator returns > tuples depending on the value of the global variable. > When the variable = true, it will let data pass, when set to false, no data > is allowed to be ingested. > > This works fine when you have all the tasks on the same machine, but of > course, it becomes troublesome in distributed deployments (tasks in different > nodes and such). > > My second approach was to create some sort of "loop" in the query. So let's > say that we have the processing logic placed in the last Filter operator in > q1, and when this "special" tuple appears, it can communicate with the first > Filter operator in q2, in order to allow data to be ingested. > I've tried playing around with IterativeStreams but I don't really get it to > work, and I feel like it's the wrong approach.. > > How can I achieve this sort of functionality? > I'm looking a bit on the BroadcastState part of the DataStream API, but I > feel confused on how to use it. Is it possible to broadcast from a downstream > to an upstream? > Suggestions would be much appreciated! > > Best Regards, > Mikael Gordani