Hi Leonard,
Thx for your reply,
Not problem to help on the JIRA topic,
In my situation, in a full sql env, what will be the best workaround to
enrich stream of data from a kafka topics with statical data based on id?
i know how to do t in stream.
eric

Le sam. 27 févr. 2021 à 05:15, Leonard Xu <xbjt...@gmail.com> a écrit :

> Hi, Eric
>
> Firstly FileSystemTableSource doe not implement LookupTableSource which
> means we cannot directly lookup a Filesystem table.
>
> In FLINK-19830, we plan to support Processing-time temporal join any
> table/views by lookup the data in join operator state which scanned from
> the filesystem table, but as the issue described: join processing for
> left stream doesn't wait for the complete snapshot of temporal table, this
> may mislead users in production environment.
> Eg: your s3 table has 1000 records, but the join operator does not know
> when all records has been arrived, the correlation maybe incorrect, thus we
> disable this feature.
>
> I think we can  implement LookupTableSource for  FileSystemTableSource
> currently, after that, we can directly lookup a Filesystem table, the
> implementation will be similar to Hive table where we cache all data of the
> files and then lookup the cache.  Could you help create an JIRA ticket for
> this?
>
>
> Best,
> Leonard
>
>
> 在 2021年2月26日,23:41,Matthias Pohl <matth...@ververica.com> 写道:
>
> Hi Eric,
> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
> thread. Maybe, he has a workaround for your case.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-19830
>
> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <sigfrid.hoffm...@gmail.com>
> wrote:
>
>> Hello
>> Working with flink 1.12.1 i read in the doc that Processing-time temporal
>> join is supported for kv like join but when i try i get a:
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Processing-time temporal join is not supported yet.
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>>         at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>>
>> my query:
>>
>> SELECT e.id
>> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime 
>> AS r ON
>> e.id = r.id
>>
>> my s3 table:
>>
>> CREATE TABLE s3Table(id STRING,
>> test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>       WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
>>
>> my kafka table:
>>
>> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
>> proctime AS PROCTIME())
>>
>>       WITH 
>> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
>> 127.0.0.1:9092','properties.group.id
>> '='mygroup','format'='json','scan.startup.mode'='group-offsets', 
>> 'properties.enable.auto.commit'='false')
>>
>>
>
>

Reply via email to