Ops, sorry there was a misleading typo/auto correction in my previous e-mail. Second sentence should have been:
> First of all you would have to use event time semantic for consistent results Piotrek > On 17 Mar 2020, at 14:43, Piotr Nowojski <pi...@ververica.com> wrote: > > Hi, > > Yes, you are looking in the right directions with the watermarks. > > First of all you would have to use event time semantic for constant results. > With processing time everything would be simpler, but it would be more > difficult to reason about the results (your choice). Secondly, you would have > to hook up the logic of enabling query1/query2 to the event time/watermarks. > Thirdly, you need to somehow to sync the input switching with the windows > boundaries. On top of that, watermarks express lower bound of even time that > you can expect. However, in order to guarantee consistency of the windows, > you would like to control the upper bound. For example: > > 1. If you want to enable Query2, you would need to check what’s the > largest/latest event time that was processed by the input splitter, lets say > that’s TS1 > 2. That means, records with event time < TS1 have already been processed by > Query1, starting some windows > 3. The earliest point for which you could enable Query2, is thus TS1 + 1. > 4. You would have to adjust Query2 start time, by start of the next time > window, let’s say that would be TS2 = TS1 + 1 + start of next window > 5. Input splitter now must keep sending records with event time < TS2 to > Query1, but already should redirect records with event time >= TS2 to Query2. > 6. Once watermark for the input splitter advances past TS2, that’s when it > can finally stop sending records to Query1 and query1 logic could be > considered “completed”. > > So Query1 would be responsible for all of the data before TS2, and Query2 > after TS2. > > Alternatively, your input splitter could also buffer some records, so that > you could enable Query2 faster, by re-sending the buffered records. But in > that case, both Query1 and Query2 would be responsible for some portion of > the data. > > Piotrek > >> On 17 Mar 2020, at 10:35, Mikael Gordani <mi.gord...@gmail.com >> <mailto:mi.gord...@gmail.com>> wrote: >> >> Hi Piotr! >> >> Continuing with my scenario, since both of the queries will share the same >> sink, I've realized that some issues will appear when I switch queries. >> Especially with regards to stateful operators, e.g aggregation. >> >> Let me provide an example: >> So, let say that both of the queries ingest a sequence of integers, and it >> will perform the average of these integers over some time. >> E.g say that query1 ingest the sequence 1,2,3,4.... >> The windows for query1 will be [1,2,3] [2,3,4] [3,4]. >> >> If I'm later on "activating" query2, I need to have both of the queries >> allowing tuples for a while, in order to allow the aggregation to finish in >> query1 before denying it input. >> But, there is a possibility that query2 might receive the tuples 3,4, which >> will result in the window: [3][3,4][3,4] >> Later on, the output of the respective queries will be: >> Query 1: 3, 4.5, 3.5 >> Query2 : 3, 3.5, 3.5 >> >> As one can see, the second output will be different. >> I'm thinking of using watermarks somehow to make sure that both queries has >> processed the same amount of data before writing to the sink, but I'm a bit >> unsure on how to do it. >> Do you have any suggestions or thoughts? >> Cheers, >> >> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <pi...@ververica.com >> <mailto:pi...@ververica.com>>: >> Hi, >> >> Let us know if something doesn’t work :) >> >> Piotrek >> >>> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gord...@gmail.com >>> <mailto:mi.gord...@gmail.com>> wrote: >>> >>> Hi, >>> I'll try it out =) >>> >>> Cheers! >>> >>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <pi...@ververica.com >>> <mailto:pi...@ververica.com>>: >>> Hi, >>> >>> In that case you could try to implement your `FilterFunction` as two input >>> operator, with broadcast control input, that would be setting the >>> `global_var`. Broadcast control input can be originating from some source, >>> or from some operator. >>> >>> Piotrek >>> >>>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gord...@gmail.com >>>> <mailto:mi.gord...@gmail.com>> wrote: >>>> >>>> Hi Piotr! >>>> Thanks for your response, I'll try to explain what I'm trying to achieve >>>> in more detail: >>>> >>>> Essentially, If I've two queries, in which has the same operators and runs >>>> in the same task, I would want to figure out some way of controlling the >>>> ingestion from a source to the respective queries in such a way that only >>>> one of the queries receive data, based on a condition. >>>> For more context, the second query (query2), is equipped with instrumented >>>> operators, which are standard operators extended with some extra >>>> functionality, in my case, they enrich the tuples with meta-data. >>>> >>>> Source --> Filter1 ---> rest of query1 >>>> | >>>> v >>>> Filter2 ---> rest of query2 >>>> >>>> By using filters prior to the queries, they allow records to pass >>>> depending on a condition, let's say a global boolean variable (which is >>>> initially set to false). >>>> If it's set to true, Filter1 will accept every record and Filter2 will >>>> disregard every record. >>>> If it's set to false, Filter2 will accept every record and Filter1 will >>>> disregard every record. >>>> So the filter operators looks something like this: >>>> boolean global_var = false; >>>> >>>> private static class filter1 implements FilterFunction<Tuple t> { >>>> @Override >>>> public boolean filter(Tuple t) throws Exception { >>>> return !global_var; >>>> } >>>> } >>>> >>>> private static class filter2 implements FilterFunction<Tuple t> { >>>> @Override >>>> public boolean filter(Tuple t) throws Exception { >>>> return global_var; >>>> } >>>> } >>>> >>>> Then later on, in the respective queries, there are some processing logic >>>> in which changes the value of the global variable, thus enabling and >>>> disabling the flow of data from the source to the respective queries. >>>> The problem lies in this global variable being problematic in distributed >>>> deployments, in which I'm having a hard time figuring out how to solve. >>>> Is it a bit more clear? =) >>> >>> >>> >>> -- >>> Med Vänliga Hälsningar, >>> Mikael Gordani >> >> >> >> -- >> Med Vänliga Hälsningar, >> Mikael Gordani >