Hi, In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.
Piotrek > On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gord...@gmail.com> wrote: > > Hi Piotr! > Thanks for your response, I'll try to explain what I'm trying to achieve in > more detail: > > Essentially, If I've two queries, in which has the same operators and runs in > the same task, I would want to figure out some way of controlling the > ingestion from a source to the respective queries in such a way that only one > of the queries receive data, based on a condition. > For more context, the second query (query2), is equipped with instrumented > operators, which are standard operators extended with some extra > functionality, in my case, they enrich the tuples with meta-data. > > Source --> Filter1 ---> rest of query1 > | > v > Filter2 ---> rest of query2 > > By using filters prior to the queries, they allow records to pass depending > on a condition, let's say a global boolean variable (which is initially set > to false). > If it's set to true, Filter1 will accept every record and Filter2 will > disregard every record. > If it's set to false, Filter2 will accept every record and Filter1 will > disregard every record. > So the filter operators looks something like this: > boolean global_var = false; > > private static class filter1 implements FilterFunction<Tuple t> { > @Override > public boolean filter(Tuple t) throws Exception { > return !global_var; > } > } > > private static class filter2 implements FilterFunction<Tuple t> { > @Override > public boolean filter(Tuple t) throws Exception { > return global_var; > } > } > > Then later on, in the respective queries, there are some processing logic in > which changes the value of the global variable, thus enabling and disabling > the flow of data from the source to the respective queries. > The problem lies in this global variable being problematic in distributed > deployments, in which I'm having a hard time figuring out how to solve. > Is it a bit more clear? =)