Hello Xingcan DataStream<Oplog> streamSource = env .addSource(kafkaConsumer) .setParallelism(4);
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Convert the DataStream into a Table with default fields "f0", "f1" Table table1 = tableEnv.fromDataStream(streamSource); Table customerMISMaster = table1.filter("ns === 'local.customerMISMaster'"). select("o as master"); Table customerMISChild1 = table1.filter("ns === 'local.customerMISChild1'"). select("o as child1"); Table customerMISChild2 = table1.filter("ns === 'local.customerMISChild2'"). select("o as child2"); Table result = customerMISMaster.join(customerMISChild1).where(" master.loanApplicationId=child1.loanApplicationId"); it is throwing error "Method threw 'org.apache.flink.table.api.ValidationException' exception. Undefined function: LOANAPPLICATIONID" ----------------------------------------------- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <sac...@iprogrammer.com> ------------------------------------------------ On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <xingc...@gmail.com> wrote: > Hi Amol, > > The “dynamic table” is just a logical concept, following which the Flink > table API is designed. > That means you don’t need to implement dynamic tables yourself. > > Flink table API provides different kinds of stream to stream joins in > recent versions (from 1.4). > The related docs can be found here https://ci.apache.org/projects > /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > https://ci.apache.org/projects/flink/flink-docs-release-1. > 5/dev/table/tableApi.html#joins>. > > Best, > Xingcan > > > > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <am...@iprogrammer.com> > wrote: > > > > Hello, > > > > I am streaming mongodb oplog using kafka and flink and want to join > > multiple tables using flink table api but i have some concerns like is it > > possible to join streamed tables in flink and if yes then please provide > me > > some example of stream join using table API. > > > > I gone through your dynamic table api doc. it is quit interesting but > > haven't found any example tutorial how to implement dynamic table. > > > > I have tried to implement table api join using pojo class but it is > > giving org.apache.flink.table.api.TableException: Cannot generate a > valid > > execution plan for the given query > > > > ----------------------------------------------- > > *Amol Suryawanshi* > > Java Developer > > am...@iprogrammer.com > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > www.iprogrammer.com <sac...@iprogrammer.com> > > ------------------------------------------------ > >