Hi, Eric Firstly FileSystemTableSource doe not implement LookupTableSource which means we cannot directly lookup a Filesystem table.
In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment. Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature. I think we can implement LookupTableSource for FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache. Could you help create an JIRA ticket for this? Best, Leonard > 在 2021年2月26日,23:41,Matthias Pohl <[email protected]> 写道: > > Hi Eric, > it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the > thread. Maybe, he has a workaround for your case. > > Best, > Matthias > > [1] https://issues.apache.org/jira/browse/FLINK-19830 > <https://issues.apache.org/jira/browse/FLINK-19830> > On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <[email protected] > <mailto:[email protected]>> wrote: > Hello > Working with flink 1.12.1 i read in the doc that Processing-time temporal > join is supported for kv like join but when i try i get a: > > Exception in thread "main" org.apache.flink.table.api.TableException: > Processing-time temporal join is not supported yet. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) > at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) > > my query: > > SELECT e.id <http://e.id/>, r.test FROM kafkaTable as e JOIN s3Table FOR > SYSTEM_TIME AS OF e.proctime AS r ON e.id <http://e.id/> = r.id <http://r.id/> > > my s3 table: > > CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED) > WITH ('connector'='filesystem','path'='s3a://fs/','format'='json') > > my kafka table: > > CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS > PROCTIME()) > WITH > ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092 > <http://127.0.0.1:9092/>','properties.group.id > <http://properties.group.id/>'='mygroup','format'='json','scan.startup.mode'='group-offsets', > 'properties.enable.auto.commit'='false') > >
