Ops, sorry there was a misleading typo/auto correction in my previous e-mail. 
Second sentence should have been:

> First of all you would have to use event time semantic for consistent results

Piotrek

> On 17 Mar 2020, at 14:43, Piotr Nowojski <pi...@ververica.com> wrote:
> 
> Hi,
> 
> Yes, you are looking in the right directions with the watermarks. 
> 
> First of all you would have to use event time semantic for constant results. 
> With processing time everything would be simpler, but it would be more 
> difficult to reason about the results (your choice). Secondly, you would have 
> to hook up the logic of enabling query1/query2 to the event time/watermarks. 
> Thirdly, you need to somehow to sync the input switching with the windows 
> boundaries. On top of that, watermarks express lower bound of even time that 
> you can expect. However, in order to guarantee consistency of the windows, 
> you would like to control the upper bound. For example:
> 
> 1. If you want to enable Query2, you would need to check what’s the 
> largest/latest event time that was processed by the input splitter, lets say 
> that’s TS1 
> 2. That means, records with event time < TS1 have already been processed by 
> Query1, starting some windows
> 3. The earliest point for which you could enable Query2, is thus TS1 + 1.
> 4. You would have to adjust Query2 start time, by start of the next time 
> window, let’s say that would be TS2 = TS1 + 1 + start of next window
> 5. Input splitter now must keep sending records with event time < TS2 to 
> Query1, but already should redirect records with event time >= TS2 to Query2.
> 6. Once watermark for the input splitter advances past TS2, that’s when it 
> can finally stop sending records to Query1 and query1 logic could be 
> considered “completed”.  
> 
> So Query1 would be responsible for all of the data before TS2, and Query2 
> after TS2.
> 
> Alternatively, your input splitter could also buffer some records, so that 
> you could enable Query2 faster, by re-sending the buffered records. But in 
> that case, both Query1 and Query2 would be responsible for some portion of 
> the data.
> 
> Piotrek
> 
>> On 17 Mar 2020, at 10:35, Mikael Gordani <mi.gord...@gmail.com 
>> <mailto:mi.gord...@gmail.com>> wrote:
>> 
>> Hi Piotr!
>> 
>> Continuing with my scenario, since both of the queries will share the same 
>> sink, I've realized that some issues will appear when I switch queries. 
>> Especially with regards to stateful operators, e.g aggregation.
>> 
>> Let me provide an example:
>> So, let say that both of the queries ingest a sequence of integers, and it 
>> will perform the average of these integers over some time.
>> E.g say that query1 ingest the sequence 1,2,3,4.... 
>> The windows for query1 will be [1,2,3] [2,3,4] [3,4]. 
>> 
>> If I'm later on "activating" query2, I need to have both of the queries 
>> allowing tuples for a while, in order to allow the aggregation to finish in 
>> query1 before denying it input.
>> But, there is a possibility that query2 might receive the tuples 3,4, which 
>> will result in the window: [3][3,4][3,4]
>> Later on, the output of the respective queries will be:
>> Query 1: 3, 4.5, 3.5
>> Query2 : 3, 3.5, 3.5
>> 
>> As one can see, the second output will be different. 
>> I'm thinking of using watermarks somehow to make sure that both queries has 
>> processed the same amount of data before writing to the sink, but I'm a bit 
>> unsure on how to do it.
>> Do you have any suggestions or thoughts?
>> Cheers,
>> 
>> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <pi...@ververica.com 
>> <mailto:pi...@ververica.com>>:
>> Hi,
>> 
>> Let us know if something doesn’t work :)
>> 
>> Piotrek
>> 
>>> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gord...@gmail.com 
>>> <mailto:mi.gord...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> I'll try it out =) 
>>> 
>>> Cheers!
>>> 
>>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <pi...@ververica.com 
>>> <mailto:pi...@ververica.com>>:
>>> Hi,
>>> 
>>> In that case you could try to implement your `FilterFunction` as two input 
>>> operator, with broadcast control input, that would be setting the 
>>> `global_var`. Broadcast control input can be originating from some source, 
>>> or from some operator.
>>> 
>>> Piotrek
>>> 
>>>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gord...@gmail.com 
>>>> <mailto:mi.gord...@gmail.com>> wrote:
>>>> 
>>>> Hi Piotr!
>>>> Thanks for your response, I'll try to explain what I'm trying to achieve 
>>>> in more detail:
>>>> 
>>>> Essentially, If I've two queries, in which has the same operators and runs 
>>>> in the same task, I would want to figure out some way of controlling the 
>>>> ingestion from a source to the respective queries in such a way that only 
>>>> one of the queries receive data, based on a condition. 
>>>> For more context, the second query (query2), is equipped with instrumented 
>>>> operators, which are standard operators extended with some extra 
>>>> functionality, in my case, they enrich the tuples with meta-data.
>>>> 
>>>> Source --> Filter1 ---> rest of query1
>>>>    |
>>>>    v
>>>>    Filter2 ---> rest of query2
>>>> 
>>>> By using filters prior to the queries, they allow records to pass 
>>>> depending on a condition, let's say a global boolean variable (which is 
>>>> initially set to false).
>>>> If it's set to true, Filter1 will accept every record and Filter2 will 
>>>> disregard every record.
>>>> If it's set to false, Filter2 will accept every record and Filter1 will 
>>>> disregard every record.
>>>> So the filter operators looks something like this: 
>>>> boolean global_var = false;
>>>> 
>>>> private static class filter1 implements FilterFunction<Tuple t> {
>>>>     @Override
>>>>     public boolean filter(Tuple t) throws Exception {
>>>>         return !global_var;
>>>>     }
>>>> }
>>>> 
>>>> private static class filter2 implements FilterFunction<Tuple t> {
>>>>     @Override
>>>>     public boolean filter(Tuple t) throws Exception {
>>>>         return global_var;
>>>>     }
>>>> }
>>>> 
>>>> Then later on, in the respective queries, there are some processing logic 
>>>> in which changes the value of the global variable, thus enabling and 
>>>> disabling the flow of data from the source to the respective queries.
>>>> The problem lies in this global variable being problematic in distributed 
>>>> deployments, in which I'm having a hard time figuring out how to solve.
>>>> Is it a bit more clear? =)
>>> 
>>> 
>>> 
>>> -- 
>>> Med Vänliga Hälsningar,
>>> Mikael Gordani
>> 
>> 
>> 
>> -- 
>> Med Vänliga Hälsningar,
>> Mikael Gordani
> 

Reply via email to