Hi Dan,
the exception that you get is a very frequent limitation in Flink SQL at
the moment.
I tried to summarize the issue recently here:
https://stackoverflow.com/questions/64445207/rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join-despite-usi/64500296#64500296
The query is quite complex. It seems that some JOIN is not recognized as
a streaming interval join. Maybe you can split up the big query into
individual subqueries and verify the the plan using
`TableEnvironment.explainSql()` to figure out which join causes the
exception.
Regards,
Timo
On 16.12.20 03:40, Dan Hill wrote:
I want to try using AWS Personalize
<https://aws.amazon.com/personalize/> to get content recommendations.
One of the fields on the input (click) event is a list of recent
impressions.
E.g.
{
...
eventType: 'click',
eventId: 'click-1',
itemId: 'item-1'
impression: ['item-2', 'item-3', 'item-4', 'item-5', ....],
}
Is there a way to produce this output using Flink SQK?
I tried doing a version of this but get the following error:
"Rowtime attributes must not be in the input rows of a regular join. As
a workaround you can cast the time attributes of input tables to
TIMESTAMP before."
Here is a simplified version of the query.
SELECT
"user".user_id AS userId,
"view".session_id AS sessionId, click.click_id AS eventId,
CAST(click.ts AS BIGINT) AS sentAt,
insertion.content_id AS itemId,
impression_content_ids AS impression
FROM "user"
RIGHT JOIN "view"
ON "user".log_user_id = "view".log_user_id
AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts +
INTERVAL '1' HOUR
JOIN insertion
ON view.view_id = insertion.view_id
AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR AND
insertion.ts + INTERVAL '1' HOUR
JOIN impression ON insertion.insertion_id = impression.insertion_id
AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR
JOIN (
SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS
ARRAY<STRING>) AS impression_content_ids
FROM (
SELECT insertion.log_user_id AS log_user_id,
ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY
impression.ts DESC) AS row_num,
insertion.content_id AS impression_content_id
FROM insertion
JOIN impression
ON insertion.insertion_id = impression.insertion_id
AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR
GROUP BY insertion.log_user_id, impression.ts, insertion.content_id
) WHERE row_num <= 25
GROUP BY log_user_id
) ON insertion.insertion_id = impression.insertion_id
AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR LEFT JOIN click
ON impression.impression_id = click.impression_id
AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND
click.ts + INTERVAL '12' HOUR"