Hi Dan,

the exception that you get is a very frequent limitation in Flink SQL at the moment.

I tried to summarize the issue recently here:

https://stackoverflow.com/questions/64445207/rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join-despite-usi/64500296#64500296

The query is quite complex. It seems that some JOIN is not recognized as a streaming interval join. Maybe you can split up the big query into individual subqueries and verify the the plan using `TableEnvironment.explainSql()` to figure out which join causes the exception.

Regards,
Timo


On 16.12.20 03:40, Dan Hill wrote:
I want to try using AWS Personalize <https://aws.amazon.com/personalize/> to get content recommendations. One of the fields on the input (click) event is a list of recent impressions.

E.g.
{
   ...
   eventType: 'click',
   eventId: 'click-1',
   itemId: 'item-1'
   impression: ['item-2', 'item-3', 'item-4', 'item-5', ....],
}

Is there a way to produce this output using Flink SQK?

I tried doing a version of this but get the following error:
"Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before."

Here is a simplified version of the query.


SELECT

   "user".user_id AS userId,

"view".session_id AS sessionId,  click.click_id AS eventId,

   CAST(click.ts AS BIGINT) AS sentAt,

   insertion.content_id AS itemId,

   impression_content_ids AS impression

FROM "user"

RIGHT JOIN "view"

   ON "user".log_user_id = "view".log_user_id

  AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts + INTERVAL '1' HOUR

JOIN insertion

   ON view.view_id = insertion.view_id

  AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR   AND insertion.ts + INTERVAL '1' HOUR

JOIN impression  ON insertion.insertion_id = impression.insertion_id

  AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND impression.ts + INTERVAL '1' HOUR

JOIN (

  SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS ARRAY<STRING>) AS impression_content_ids

FROM (

   SELECT insertion.log_user_id AS log_user_id,

  ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY impression.ts DESC) AS row_num,

       insertion.content_id AS impression_content_id

     FROM insertion

     JOIN impression

     ON insertion.insertion_id = impression.insertion_id

      AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND impression.ts + INTERVAL '1' HOUR

     GROUP BY insertion.log_user_id, impression.ts, insertion.content_id

) WHERE row_num <= 25

GROUP BY log_user_id

) ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND impression.ts + INTERVAL '1' HOUR  LEFT JOIN click

ON impression.impression_id = click.impression_id

    AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND click.ts + INTERVAL '12' HOUR"


Reply via email to