Hi Piotr!

Continuing with my scenario, since both of the queries will share the same
sink, I've realized that some issues will appear when I switch queries.
Especially with regards to stateful operators, e.g aggregation.

Let me provide an example:
So, let say that both of the queries ingest a sequence of integers, and it
will perform the average of these integers over some time.
E.g say that *query1* ingest the sequence *1,2,3,4.... *
The windows for *query1* will be *[1,2,3] [2,3,4] [3,4]*.

If I'm later on "activating" *query2*, I need to have both of the queries
allowing tuples for a while, in order to allow the aggregation to finish in
*query1* before denying it input.
But, there is a possibility that *query2* might receive the tuples *3,4*,
which will result in the window: *[3][3,4][3,4]*
Later on, the output of the respective queries will be:
Query 1: 3, *4.5*, 3.5
Query2 : 3, *3.5*, 3.5

As one can see, the second output will be different.
I'm thinking of using watermarks somehow to make sure that both queries has
processed the same amount of data before writing to the sink, but I'm a bit
unsure on how to do it.
Do you have any suggestions or thoughts?
Cheers,

Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <pi...@ververica.com>:

> Hi,
>
> Let us know if something doesn’t work :)
>
> Piotrek
>
> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gord...@gmail.com> wrote:
>
> Hi,
> I'll try it out =)
>
> Cheers!
>
> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <pi...@ververica.com>:
>
>> Hi,
>>
>> In that case you could try to implement your `FilterFunction` as two
>> input operator, with broadcast control input, that would be setting the
>> `global_var`. Broadcast control input can be originating from some source,
>> or from some operator.
>>
>> Piotrek
>>
>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gord...@gmail.com> wrote:
>>
>> Hi Piotr!
>> Thanks for your response, I'll try to explain what I'm trying to achieve
>> in more detail:
>>
>> Essentially, If I've two queries, in which has the same operators and
>> runs in the same task, I would want to figure out some way of controlling
>> the ingestion from *a source* to the respective queries in such a way
>> that only one of the queries receive data, based on a condition.
>> For more context, the second query (query2), is equipped with
>> instrumented operators, which are standard operators extended with some
>> extra functionality, in my case, they enrich the tuples with meta-data.
>>
>> Source --> *Filter1* ---> rest of query1
>>    |
>>    v
>>    *Filter2* ---> rest of query2
>>
>> By using *filters* prior to the queries, they allow records to pass
>> depending on a condition*, *let's say a global boolean variable (which
>> is initially set to false).
>> If it's set to *true, Filter1 will accept every record and Filter2 will
>> disregard every record.*
>> If it's set to
>> *false, Filter2 will accept every record and Filter1 will disregard every
>> record.*
>>
>> *So the filter operators looks something like this: *
>>
>> boolean global_var = false;
>>
>> private static class filter1 implements FilterFunction<Tuple t> {
>>     @Override
>>     public boolean filter(Tuple t) throws Exception {
>>         return !global_var;
>>     }
>> }
>>
>> private static class filter2 implements FilterFunction<Tuple t> {
>>     @Override
>>     public boolean filter(Tuple t) throws Exception {
>>         return global_var;
>>     }
>> }
>>
>>
>> Then later on, in the respective queries, there are some processing logic
>> in which changes the value of the global variable, thus enabling and
>> disabling the flow of data from the source to the respective queries.
>> The problem lies in this global variable being problematic in distributed
>> deployments, in which I'm having a hard time figuring out how to solve.
>> Is it a bit more clear? =)
>>
>>
>>
>
> --
> Med Vänliga Hälsningar,
> Mikael Gordani
>
>
>

-- 
Med Vänliga Hälsningar,
Mikael Gordani

Reply via email to