No worries and great idea! I will play around with it and see what I manage to do. Cheers!
Den tis 17 mars 2020 kl 15:59 skrev Piotr Nowojski <pi...@ververica.com>: > Ops, sorry there was a misleading typo/auto correction in my previous > e-mail. Second sentence should have been: > > > First of all you would have to use event time semantic for consistent > results > > Piotrek > > On 17 Mar 2020, at 14:43, Piotr Nowojski <pi...@ververica.com> wrote: > > Hi, > > Yes, you are looking in the right directions with the watermarks. > > First of all you would have to use event time semantic for constant > results. With processing time everything would be simpler, but it would be > more difficult to reason about the results (your choice). Secondly, you > would have to hook up the logic of enabling query1/query2 to the event > time/watermarks. Thirdly, you need to somehow to sync the input switching > with the windows boundaries. On top of that, watermarks express lower bound > of even time that you can expect. However, in order to guarantee > consistency of the windows, you would like to control the upper bound. For > example: > > 1. If you want to enable Query2, you would need to check what’s the > largest/latest event time that was processed by the input splitter, lets > say that’s TS1 > 2. That means, records with event time < TS1 have already been processed > by Query1, starting some windows > 3. The earliest point for which you could enable Query2, is thus TS1 + 1. > 4. You would have to adjust Query2 start time, by start of the next time > window, let’s say that would be TS2 = TS1 + 1 + start of next window > 5. Input splitter now must keep sending records with event time < TS2 to > Query1, but already should redirect records with event time >= TS2 to > Query2. > 6. Once watermark for the input splitter advances past TS2, that’s when it > can finally stop sending records to Query1 and query1 logic could be > considered “completed”. > > So Query1 would be responsible for all of the data before TS2, and Query2 > after TS2. > > Alternatively, your input splitter could also buffer some records, so that > you could enable Query2 faster, by re-sending the buffered records. But in > that case, both Query1 and Query2 would be responsible for some portion of > the data. > > Piotrek > > On 17 Mar 2020, at 10:35, Mikael Gordani <mi.gord...@gmail.com> wrote: > > Hi Piotr! > > Continuing with my scenario, since both of the queries will share the same > sink, I've realized that some issues will appear when I switch queries. > Especially with regards to stateful operators, e.g aggregation. > > Let me provide an example: > So, let say that both of the queries ingest a sequence of integers, and it > will perform the average of these integers over some time. > E.g say that *query1* ingest the sequence *1,2,3,4.... * > The windows for *query1* will be *[1,2,3] [2,3,4] [3,4]*. > > If I'm later on "activating" *query2*, I need to have both of the queries > allowing tuples for a while, in order to allow the aggregation to finish in > *query1* before denying it input. > But, there is a possibility that *query2* might receive the tuples *3,4*, > which will result in the window: *[3][3,4][3,4]* > Later on, the output of the respective queries will be: > Query 1: 3, *4.5*, 3.5 > Query2 : 3, *3.5*, 3.5 > > As one can see, the second output will be different. > I'm thinking of using watermarks somehow to make sure that both queries > has processed the same amount of data before writing to the sink, but I'm a > bit unsure on how to do it. > Do you have any suggestions or thoughts? > Cheers, > > Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <pi...@ververica.com>: > >> Hi, >> >> Let us know if something doesn’t work :) >> >> Piotrek >> >> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gord...@gmail.com> wrote: >> >> Hi, >> I'll try it out =) >> >> Cheers! >> >> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <pi...@ververica.com>: >> >>> Hi, >>> >>> In that case you could try to implement your `FilterFunction` as two >>> input operator, with broadcast control input, that would be setting the >>> `global_var`. Broadcast control input can be originating from some source, >>> or from some operator. >>> >>> Piotrek >>> >>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gord...@gmail.com> wrote: >>> >>> Hi Piotr! >>> Thanks for your response, I'll try to explain what I'm trying to achieve >>> in more detail: >>> >>> Essentially, If I've two queries, in which has the same operators and >>> runs in the same task, I would want to figure out some way of controlling >>> the ingestion from *a source* to the respective queries in such a way >>> that only one of the queries receive data, based on a condition. >>> For more context, the second query (query2), is equipped with >>> instrumented operators, which are standard operators extended with some >>> extra functionality, in my case, they enrich the tuples with meta-data. >>> >>> Source --> *Filter1* ---> rest of query1 >>> | >>> v >>> *Filter2* ---> rest of query2 >>> >>> By using *filters* prior to the queries, they allow records to pass >>> depending on a condition*, *let's say a global boolean variable (which >>> is initially set to false). >>> If it's set to *true, Filter1 will accept every record and Filter2 will >>> disregard every record.* >>> If it's set to >>> *false, Filter2 will accept every record and Filter1 will disregard >>> every record.* >>> >>> *So the filter operators looks something like this: * >>> >>> boolean global_var = false; >>> >>> private static class filter1 implements FilterFunction<Tuple t> { >>> @Override >>> public boolean filter(Tuple t) throws Exception { >>> return !global_var; >>> } >>> } >>> >>> private static class filter2 implements FilterFunction<Tuple t> { >>> @Override >>> public boolean filter(Tuple t) throws Exception { >>> return global_var; >>> } >>> } >>> >>> >>> Then later on, in the respective queries, there are some processing >>> logic in which changes the value of the global variable, thus enabling and >>> disabling the flow of data from the source to the respective queries. >>> The problem lies in this global variable being problematic in >>> distributed deployments, in which I'm having a hard time figuring out how >>> to solve. >>> Is it a bit more clear? =) >>> >>> >>> >> >> -- >> Med Vänliga Hälsningar, >> Mikael Gordani >> >> >> > > -- > Med Vänliga Hälsningar, > Mikael Gordani > > > > -- Med Vänliga Hälsningar, Mikael Gordani