Hello fabian, I have tried to convert table into stream as below
Cannot generate a valid execution plan for the given query: tableEnv.toDataStream(result, Oplog.class); and it is giving me below error. LogicalFilter(condition=[<>($1, $3)]) LogicalJoin(condition=[true], joinType=[inner]) LogicalProject(master=[$1], timeStamp=[$5]) LogicalFilter(condition=[=($0, _UTF-16LE'local.customerMISMaster')]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalProject(child1=[$1], timeStamp2=[$5]) LogicalFilter(condition=[=($0, _UTF-16LE'local.customerMISChild1')]) LogicalTableScan(table=[[_DataStreamTable_0]]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. ----------------------------------------------- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <sac...@iprogrammer.com> ------------------------------------------------ On Mon, Jul 2, 2018 at 4:43 PM, Amol S - iProgrammer <am...@iprogrammer.com> wrote: > Hello Fabian, > > Can you please tell me hot to convert Table back into DataStream? I just > want to print the table result. > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > am...@iprogrammer.com > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <sac...@iprogrammer.com> > ------------------------------------------------ > > On Mon, Jul 2, 2018 at 4:20 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> You can also use Row, but then you cannot rely on automatic type >> extraction >> and provide TypeInformation. >> >> Amol S - iProgrammer <am...@iprogrammer.com> schrieb am Mo., 2. Juli >> 2018, >> 12:37: >> >> > Hello Fabian, >> > >> > According to my requirement I can not create static pojo's for all >> classes >> > because I want to create dynamic jobs for all tables based on rule >> engine >> > config. Please suggest me if there any other way to achieve this. >> > >> > ----------------------------------------------- >> > *Amol Suryawanshi* >> > Java Developer >> > am...@iprogrammer.com >> > >> > >> > *iProgrammer Solutions Pvt. Ltd.* >> > >> > >> > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >> 411016, >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > www.iprogrammer.com <sac...@iprogrammer.com> >> > ------------------------------------------------ >> > >> > On Mon, Jul 2, 2018 at 4:02 PM, Fabian Hueske <fhue...@gmail.com> >> wrote: >> > >> > > Hi Amol, >> > > >> > > These are the requirements for POJOs [1] that are fully supported by >> > Flink. >> > > >> > > Best, Fabian >> > > >> > > [1] >> > > https://ci.apache.org/projects/flink/flink-docs- >> > > release-1.5/dev/api_concepts.html#pojos >> > > >> > > 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer < >> am...@iprogrammer.com>: >> > > >> > > > Hello Xingcan >> > > > >> > > > As mentioned in above mail thread I am streaming mongodb oplog to >> join >> > > > multiple mongo tables based on some unique key (Primary key). To >> > achieve >> > > > this I have created one java pojo as below. where o represent >> generic >> > > pojo >> > > > type of mongodb which has my table fields i.e. dynamic. now I want >> to >> > use >> > > > table api join over this basic BasicDBObject but it seem flink does >> not >> > > > allow generic pojo's. please suggest on this. >> > > > >> > > > public class Oplog { >> > > > private OplogTimestamp ts; >> > > > private BasicDBObject o; >> > > > } >> > > > >> > > > >> > > > >> > > > ----------------------------------------------- >> > > > *Amol Suryawanshi* >> > > > Java Developer >> > > > am...@iprogrammer.com >> > > > >> > > > >> > > > *iProgrammer Solutions Pvt. Ltd.* >> > > > >> > > > >> > > > >> > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >> > > 411016, >> > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > > > www.iprogrammer.com <sac...@iprogrammer.com> >> > > > ------------------------------------------------ >> > > > >> > > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer < >> > > > am...@iprogrammer.com> >> > > > wrote: >> > > > >> > > > > Hello Xingcan >> > > > > >> > > > > DataStream<Oplog> streamSource = env >> > > > > .addSource(kafkaConsumer) >> > > > > .setParallelism(4); >> > > > > >> > > > > StreamTableEnvironment tableEnv = TableEnvironment. >> > > > getTableEnvironment(env); >> > > > > // Convert the DataStream into a Table with default fields "f0", >> "f1" >> > > > > Table table1 = tableEnv.fromDataStream(streamSource); >> > > > > >> > > > > >> > > > > Table customerMISMaster = table1.filter("ns === >> > > > 'local.customerMISMaster'" >> > > > > ).select("o as master"); >> > > > > Table customerMISChild1 = table1.filter("ns === >> > > > 'local.customerMISChild1'" >> > > > > ).select("o as child1"); >> > > > > Table customerMISChild2 = table1.filter("ns === >> > > > 'local.customerMISChild2'" >> > > > > ).select("o as child2"); >> > > > > Table result = customerMISMaster.join(customerMISChild1).where(" >> > > master. >> > > > > loanApplicationId=child1.loanApplicationId"); >> > > > > >> > > > > >> > > > > it is throwing error "Method threw 'org.apache.flink.table.api. >> > > ValidationException' >> > > > exception. Undefined function: LOANAPPLICATIONID" >> > > > > >> > > > > >> > > > > >> > > > > ----------------------------------------------- >> > > > > *Amol Suryawanshi* >> > > > > Java Developer >> > > > > am...@iprogrammer.com >> > > > > >> > > > > >> > > > > *iProgrammer Solutions Pvt. Ltd.* >> > > > > >> > > > > >> > > > > >> > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune >> - >> > > > 411016, >> > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > > > > www.iprogrammer.com <sac...@iprogrammer.com> >> > > > > ------------------------------------------------ >> > > > > >> > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <xingc...@gmail.com> >> > > wrote: >> > > > > >> > > > >> Hi Amol, >> > > > >> >> > > > >> The “dynamic table” is just a logical concept, following which >> the >> > > Flink >> > > > >> table API is designed. >> > > > >> That means you don’t need to implement dynamic tables yourself. >> > > > >> >> > > > >> Flink table API provides different kinds of stream to stream >> joins >> > in >> > > > >> recent versions (from 1.4). >> > > > >> The related docs can be found here >> https://ci.apache.org/projects >> > > > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < >> > > > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 >> > > > >> /dev/table/tableApi.html#joins>. >> > > > >> >> > > > >> Best, >> > > > >> Xingcan >> > > > >> >> > > > >> >> > > > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < >> > > > am...@iprogrammer.com> >> > > > >> wrote: >> > > > >> > >> > > > >> > Hello, >> > > > >> > >> > > > >> > I am streaming mongodb oplog using kafka and flink and want to >> > join >> > > > >> > multiple tables using flink table api but i have some concerns >> > like >> > > is >> > > > >> it >> > > > >> > possible to join streamed tables in flink and if yes then >> please >> > > > >> provide me >> > > > >> > some example of stream join using table API. >> > > > >> > >> > > > >> > I gone through your dynamic table api doc. it is quit >> interesting >> > > but >> > > > >> > haven't found any example tutorial how to implement dynamic >> table. >> > > > >> > >> > > > >> > I have tried to implement table api join using pojo class but >> it >> > is >> > > > >> > giving org.apache.flink.table.api.TableException: Cannot >> generate >> > a >> > > > >> valid >> > > > >> > execution plan for the given query >> > > > >> > >> > > > >> > ----------------------------------------------- >> > > > >> > *Amol Suryawanshi* >> > > > >> > Java Developer >> > > > >> > am...@iprogrammer.com >> > > > >> > >> > > > >> > >> > > > >> > *iProgrammer Solutions Pvt. Ltd.* >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing >> Society, >> > > > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, >> Pune >> > - >> > > > >> 411016, >> > > > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > > > >> > www.iprogrammer.com <sac...@iprogrammer.com> >> > > > >> > ------------------------------------------------ >> > > > >> >> > > > >> >> > > > > >> > > > >> > > >> > >> > >