Hi Eric, it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case.
Best, Matthias [1] https://issues.apache.org/jira/browse/FLINK-19830 On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <sigfrid.hoffm...@gmail.com> wrote: > Hello > Working with flink 1.12.1 i read in the doc that Processing-time temporal > join is supported for kv like join but when i try i get a: > > Exception in thread "main" org.apache.flink.table.api.TableException: > Processing-time temporal join is not supported yet. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) > at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) > > my query: > > SELECT e.id > , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime > AS r ON > e.id = r.id > > my s3 table: > > CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED) > WITH ('connector'='filesystem','path'='s3a://fs/','format'='json') > > my kafka table: > > CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, > proctime AS PROCTIME()) > > WITH > ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'=' > 127.0.0.1:9092','properties.group.id > '='mygroup','format'='json','scan.startup.mode'='group-offsets', > 'properties.enable.auto.commit'='false') > >