Hi Timo,

Apologies for the late response. I somehow seem to have missed your reply.

I do want the join to be "time-based" since I need to perform a tumble
grouping operation on top of the join.

I tried setting the watermark strategy to `R` - INTERVAL '0.001' SECONDS,
that didn't help either.

Note that we have a custom connector to an internal storage engine. The
connector implements ScanTableSource interface with
SupportsWatermarkPushDown ability. Would the watermark strategy in the
table schema matter in that case? I changed the query to the following to
simplify further -

select F.C0, F.C1, F.R, D.C0, D.C1, D.R from F JOIN D FOR SYSTEM_TIME AS OF
F.R ON F.C1 = D.C1

I still do not see any output from the pipeline. The overall logs I see
from the connecter is the following -

Emit D.D row=+I(0,0,1970-01-01T00:00)@time=0  -->
ctx.collectWithTimestamp(row_,
rowtime);
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.D row=+I(1,1,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00)@time=0
Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F row=+I(3,3,1970-01-01T00:00:03)@time=3000
Emit D.D row=+I(3,3,1970-01-01T00:00)@time=0
Emit D.F row=+I(4,4,1970-01-01T00:00:04)@time=4000
Emit D.F wm=4000  --->  ctx.emitWatermark(new Watermark(wm));
Emit D.D wm=0

Now, if I change the rowtime of table D to 1s instead of 0, I get one row
as output.

Emit D.D row=+I(0,0,1970-01-01T00:00:01)@time=1000
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(3,3,1970-01-01T00:00:01)@time=1000
Emit D.F wm=1000
Emit D.D wm=1000

reply: (1, "1", 1000, 1, "1", 1000)

The next row streamed from F which should join with a row emitted from D
does not emit any output -

Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F wm=2000
NO REPLY

My understanding of temporal joins is that the latest row from D should be
picked for joining rows from F.  Is my expectation that the (2, 2, 2s) in F
join with (2, 2, 1s) row in D wrong?

Regards,
Satyam


On Tue, Mar 16, 2021 at 5:54 AM Timo Walther <twal...@apache.org> wrote:

> Hi Satyam,
>
> first of all your initial join query can also work, you just need to
> make sure that no time attribute is in the SELECT clause. As the
> exception indicates, you need to cast all time attributes to TIMESTAMP.
> The reason for this is some major design issue that is also explained
> here where a time attribute must not be in the output of a regular join:
>
> https://stackoverflow.com/a/64500296/806430
>
> However, since you would like to perform the join "time-based" either
> interval join or temporal join might solve your use cases.
>
> In your case I guess the watermark strategy of D is the problem. Are you
> sure the result is:
>
>  >     Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  >     Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  >     Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>  >     Emit D watermark=0
>
> and not:
>
>  >     Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  >     Emit D watermark=0
>  >     Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  >     Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>
> Or maybe the watermark is even dropped. Could you try to use a watermark
> strategy with
>
> `R` - INTERVAL '0.001' SECONDS
>
> I hope this helps.
>
> Regards,
> Timo
>
>
>
> On 16.03.21 04:37, Satyam Shekhar wrote:
> > Hello folks,
> >
> > I would love to hear back your feedback on this.
> >
> > Regards,
> > Satyam
> >
> > On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar <satyamshek...@gmail.com
> > <mailto:satyamshek...@gmail.com>> wrote:
> >
> >     Hello folks,
> >
> >     I am looking to enrich rows from an unbounded streaming table by
> >     joining it with a bounded static table while preserving rowtime for
> >     the streaming table. For example, let's consider table two tables F
> >     and D, where F is unbounded and D is bounded. The schema for the two
> >     tables is the following -
> >
> >     F:
> >       |-- C0: BIGINT
> >       |-- C1: STRING
> >       |-- R: TIMESTAMP(3) **rowtime**
> >       |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
> >
> >     D:
> >       |-- C0: BIGINT
> >       |-- C1: STRING NOT NULL
> >
> >     I'd like to run the following query on this schema -
> >
> >     select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> >          from F join D ON F.C1 = D.C1
> >          group by D.C1, tumble(F.R, interval '1' second)
> >
> >     However, I run into the following error while running the above
> query -
> >
> >     "Rowtime attributes must not be in the input rows of a regular join.
> >     As a workaround you can cast the time attributes of input tables to
> >     TIMESTAMP before."
> >
> >     My understanding reading the docs is that Time Temporal Join is
> >     meant to solve this problem. So I model table D as the following -
> >
> >     D:
> >       |-- C0: BIGINT
> >       |-- C1: STRING NOT NULL
> >       |-- R: TIMESTAMP(3)
> >       |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
> >       |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY
> (C1)
> >
> >     With column D.R always set to 0 and modify the query as follows -
> >
> >     select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> >          from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1
> >          group by D.C1, tumble(F.R, interval '1' second)
> >
> >     The above query runs but does not return any result. I have the
> >     following data in D initially -
> >     Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> >     Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> >     Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
> >     Emit D watermark=0
> >
> >     And F streams the following rows -
> >     Emit F row=+I(0,"0",1970-01-01T00:00)@time=0
> >     Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000
> >     Emit F watermark=1000
> >
> >     I expect that two rows in F will join with matching rows (on C1) in
> >     D and produce some output. But I do not see anything in the output.
> >
> >     So I have the following questions -
> >
> >     1. Is time temporal join the correct tool to solve this problem?
> >     2. What could be the reason for not getting any output rows in the
> >     result?
> >
> >     Thanks,
> >     Satyam
> >
>
>

Reply via email to