Hello Xingcan As mentioned in above mail thread I am streaming mongodb oplog to join multiple mongo tables based on some unique key (Primary key). To achieve this I have created one java pojo as below. where o represent generic pojo type of mongodb which has my table fields i.e. dynamic. now I want to use table api join over this basic BasicDBObject but it seem flink does not allow generic pojo's. please suggest on this.
public class Oplog { private OplogTimestamp ts; private BasicDBObject o; } ----------------------------------------------- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <sac...@iprogrammer.com> ------------------------------------------------ On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer <am...@iprogrammer.com> wrote: > Hello Xingcan > > DataStream<Oplog> streamSource = env > .addSource(kafkaConsumer) > .setParallelism(4); > > StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); > // Convert the DataStream into a Table with default fields "f0", "f1" > Table table1 = tableEnv.fromDataStream(streamSource); > > > Table customerMISMaster = table1.filter("ns === 'local.customerMISMaster'" > ).select("o as master"); > Table customerMISChild1 = table1.filter("ns === 'local.customerMISChild1'" > ).select("o as child1"); > Table customerMISChild2 = table1.filter("ns === 'local.customerMISChild2'" > ).select("o as child2"); > Table result = customerMISMaster.join(customerMISChild1).where("master. > loanApplicationId=child1.loanApplicationId"); > > > it is throwing error "Method threw > 'org.apache.flink.table.api.ValidationException' exception. Undefined > function: LOANAPPLICATIONID" > > > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > am...@iprogrammer.com > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <sac...@iprogrammer.com> > ------------------------------------------------ > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <xingc...@gmail.com> wrote: > >> Hi Amol, >> >> The “dynamic table” is just a logical concept, following which the Flink >> table API is designed. >> That means you don’t need to implement dynamic tables yourself. >> >> Flink table API provides different kinds of stream to stream joins in >> recent versions (from 1.4). >> The related docs can be found here https://ci.apache.org/projects >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 >> /dev/table/tableApi.html#joins>. >> >> Best, >> Xingcan >> >> >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <am...@iprogrammer.com> >> wrote: >> > >> > Hello, >> > >> > I am streaming mongodb oplog using kafka and flink and want to join >> > multiple tables using flink table api but i have some concerns like is >> it >> > possible to join streamed tables in flink and if yes then please >> provide me >> > some example of stream join using table API. >> > >> > I gone through your dynamic table api doc. it is quit interesting but >> > haven't found any example tutorial how to implement dynamic table. >> > >> > I have tried to implement table api join using pojo class but it is >> > giving org.apache.flink.table.api.TableException: Cannot generate a >> valid >> > execution plan for the given query >> > >> > ----------------------------------------------- >> > *Amol Suryawanshi* >> > Java Developer >> > am...@iprogrammer.com >> > >> > >> > *iProgrammer Solutions Pvt. Ltd.* >> > >> > >> > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >> 411016, >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > www.iprogrammer.com <sac...@iprogrammer.com> >> > ------------------------------------------------ >> >> >