Hi Dawid,

Thanks for getting back, the *ROWTIME* modifier did ring a bell and I was able 
to find the issue. We are registering the inner table correctly (timestamp is 
of type timestamp(3) rowtime), but we had an intermediate step where we 
converted that to a Datastream to optionally add custom triggers and then 
re-registered it and tried to use that table. When re-registering the 
Datastream we were converting the timestamp(3) rowtime to a timestamp field (as 
I think we hit a warning that the rowtime timestamp wasn’t part of the public 
api).

We didn’t actually need to do the intermediate datastream so when I dropped 
that step and directly use the inner table (with the rowtime timestamp), things 
work as expected!

Thanks,

-- Piyush


From: Dawid Wysakowicz <dwysakow...@apache.org>
Date: Thursday, November 14, 2019 at 10:57 AM
To: <user@flink.apache.org>
Subject: Re: Propagating event time field from nested query


Hi Piyush,

Could you verify that the type of the `timestamp` field in the table 
my_kafka_stream is of TIMESTAMP(3) *ROWTIME* type? Could you share how you 
create this table?

What you are doing should work and what I suspect is that the `timestamp` field 
in the `my_kafka_stream` changed the type somehow.

Best,

Dawid
On 11/11/2019 22:43, Piyush Narang wrote:
Hi folks,

We have a Flink streaming Table / SQL job that we were looking to migrate from 
an older Flink release (1.6.x) to 1.9. As part of doing so, we have been seeing 
a few errors which I was trying to figure out how to work around. Would 
appreciate any help / pointers.
Job essentially involves a nested query:
SELECT `timestamp`, cost, partnerid, impression_id, …
FROM my_kafka_stream

The kafka stream has a ‘timestamp’ field that tracks event time. We register 
this nested query as “base_query”.

We now use this in a couple of outer aggregation queries (different outer 
aggregation queries differ in terms of the time window we aggregate over – 1M, 
1H, 6H etc):
SELECT
  SUM(cost) AS FLOAT AS CostPerPartner,
  COUNT(impression_id) AS ImpsPerPartner,
  …
FROM
  base_query
GROUP BY
  partnerid,
  HOP(`timestamp`, INTERVAL '30' SECOND, INTERVAL '1' MINUTE)

While the outer query would get translated and scheduled as a Flink streaming 
job just fine on 1.6, we are running into this error when we try to bump our 
build to 1.9:
“Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Window can only be defined over a time attribute column.”

Any suggestions on how we could work around this? I saw a thread suggesting 
using HOP_ROWTIME but if I understand correctly, that would mean we would need 
to do the hop window generation / group by in the nested query which we’d like 
to avoid (as we have a couple of time window combinations to generate).

Thanks,
-- Piyush

Reply via email to