Hi Leonard, Thx for your reply, Not problem to help on the JIRA topic, In my situation, in a full sql env, what will be the best workaround to enrich stream of data from a kafka topics with statical data based on id? i know how to do t in stream. eric
Le sam. 27 févr. 2021 à 05:15, Leonard Xu <xbjt...@gmail.com> a écrit : > Hi, Eric > > Firstly FileSystemTableSource doe not implement LookupTableSource which > means we cannot directly lookup a Filesystem table. > > In FLINK-19830, we plan to support Processing-time temporal join any > table/views by lookup the data in join operator state which scanned from > the filesystem table, but as the issue described: join processing for > left stream doesn't wait for the complete snapshot of temporal table, this > may mislead users in production environment. > Eg: your s3 table has 1000 records, but the join operator does not know > when all records has been arrived, the correlation maybe incorrect, thus we > disable this feature. > > I think we can implement LookupTableSource for FileSystemTableSource > currently, after that, we can directly lookup a Filesystem table, the > implementation will be similar to Hive table where we cache all data of the > files and then lookup the cache. Could you help create an JIRA ticket for > this? > > > Best, > Leonard > > > 在 2021年2月26日,23:41,Matthias Pohl <matth...@ververica.com> 写道: > > Hi Eric, > it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the > thread. Maybe, he has a workaround for your case. > > Best, > Matthias > > [1] https://issues.apache.org/jira/browse/FLINK-19830 > > On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <sigfrid.hoffm...@gmail.com> > wrote: > >> Hello >> Working with flink 1.12.1 i read in the doc that Processing-time temporal >> join is supported for kv like join but when i try i get a: >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> Processing-time temporal join is not supported yet. >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) >> >> my query: >> >> SELECT e.id >> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime >> AS r ON >> e.id = r.id >> >> my s3 table: >> >> CREATE TABLE s3Table(id STRING, >> test STRING, PRIMARY KEY (id) NOT ENFORCED) >> WITH ('connector'='filesystem','path'='s3a://fs/','format'='json') >> >> my kafka table: >> >> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, >> proctime AS PROCTIME()) >> >> WITH >> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'=' >> 127.0.0.1:9092','properties.group.id >> '='mygroup','format'='json','scan.startup.mode'='group-offsets', >> 'properties.enable.auto.commit'='false') >> >> > >