Hi Timo, Apologies for the late response. I somehow seem to have missed your reply.
I do want the join to be "time-based" since I need to perform a tumble grouping operation on top of the join. I tried setting the watermark strategy to `R` - INTERVAL '0.001' SECONDS, that didn't help either. Note that we have a custom connector to an internal storage engine. The connector implements ScanTableSource interface with SupportsWatermarkPushDown ability. Would the watermark strategy in the table schema matter in that case? I changed the query to the following to simplify further - select F.C0, F.C1, F.R, D.C0, D.C1, D.R from F JOIN D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1 I still do not see any output from the pipeline. The overall logs I see from the connecter is the following - Emit D.D row=+I(0,0,1970-01-01T00:00)@time=0 --> ctx.collectWithTimestamp(row_, rowtime); Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0 Emit D.D row=+I(1,1,1970-01-01T00:00)@time=0 Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(2,2,1970-01-01T00:00)@time=0 Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000 Emit D.F row=+I(3,3,1970-01-01T00:00:03)@time=3000 Emit D.D row=+I(3,3,1970-01-01T00:00)@time=0 Emit D.F row=+I(4,4,1970-01-01T00:00:04)@time=4000 Emit D.F wm=4000 ---> ctx.emitWatermark(new Watermark(wm)); Emit D.D wm=0 Now, if I change the rowtime of table D to 1s instead of 0, I get one row as output. Emit D.D row=+I(0,0,1970-01-01T00:00:01)@time=1000 Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0 Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(1,1,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(2,2,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(3,3,1970-01-01T00:00:01)@time=1000 Emit D.F wm=1000 Emit D.D wm=1000 reply: (1, "1", 1000, 1, "1", 1000) The next row streamed from F which should join with a row emitted from D does not emit any output - Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000 Emit D.F wm=2000 NO REPLY My understanding of temporal joins is that the latest row from D should be picked for joining rows from F. Is my expectation that the (2, 2, 2s) in F join with (2, 2, 1s) row in D wrong? Regards, Satyam On Tue, Mar 16, 2021 at 5:54 AM Timo Walther <twal...@apache.org> wrote: > Hi Satyam, > > first of all your initial join query can also work, you just need to > make sure that no time attribute is in the SELECT clause. As the > exception indicates, you need to cast all time attributes to TIMESTAMP. > The reason for this is some major design issue that is also explained > here where a time attribute must not be in the output of a regular join: > > https://stackoverflow.com/a/64500296/806430 > > However, since you would like to perform the join "time-based" either > interval join or temporal join might solve your use cases. > > In your case I guess the watermark strategy of D is the problem. Are you > sure the result is: > > > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 > > Emit D watermark=0 > > and not: > > > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > > Emit D watermark=0 > > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 > > Or maybe the watermark is even dropped. Could you try to use a watermark > strategy with > > `R` - INTERVAL '0.001' SECONDS > > I hope this helps. > > Regards, > Timo > > > > On 16.03.21 04:37, Satyam Shekhar wrote: > > Hello folks, > > > > I would love to hear back your feedback on this. > > > > Regards, > > Satyam > > > > On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar <satyamshek...@gmail.com > > <mailto:satyamshek...@gmail.com>> wrote: > > > > Hello folks, > > > > I am looking to enrich rows from an unbounded streaming table by > > joining it with a bounded static table while preserving rowtime for > > the streaming table. For example, let's consider table two tables F > > and D, where F is unbounded and D is bounded. The schema for the two > > tables is the following - > > > > F: > > |-- C0: BIGINT > > |-- C1: STRING > > |-- R: TIMESTAMP(3) **rowtime** > > |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS > > > > D: > > |-- C0: BIGINT > > |-- C1: STRING NOT NULL > > > > I'd like to run the following query on this schema - > > > > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) > > from F join D ON F.C1 = D.C1 > > group by D.C1, tumble(F.R, interval '1' second) > > > > However, I run into the following error while running the above > query - > > > > "Rowtime attributes must not be in the input rows of a regular join. > > As a workaround you can cast the time attributes of input tables to > > TIMESTAMP before." > > > > My understanding reading the docs is that Time Temporal Join is > > meant to solve this problem. So I model table D as the following - > > > > D: > > |-- C0: BIGINT > > |-- C1: STRING NOT NULL > > |-- R: TIMESTAMP(3) > > |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS > > |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY > (C1) > > > > With column D.R always set to 0 and modify the query as follows - > > > > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) > > from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1 > > group by D.C1, tumble(F.R, interval '1' second) > > > > The above query runs but does not return any result. I have the > > following data in D initially - > > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 > > Emit D watermark=0 > > > > And F streams the following rows - > > Emit F row=+I(0,"0",1970-01-01T00:00)@time=0 > > Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000 > > Emit F watermark=1000 > > > > I expect that two rows in F will join with matching rows (on C1) in > > D and produce some output. But I do not see anything in the output. > > > > So I have the following questions - > > > > 1. Is time temporal join the correct tool to solve this problem? > > 2. What could be the reason for not getting any output rows in the > > result? > > > > Thanks, > > Satyam > > > >