Hi, You have to pass the StreamExecutionEnvironment to the getTableEnvironment() method, not the DataStream (or DataStreamSource). Change
val tableEnv = TableEnvironment.getTableEnvironment(dataStream) to val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) Best, Fabian 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: > Hi, > > FYI, these are my imports > > import java.util.Properties > import java.util.Arrays > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.streaming.api.environment.StreamExecutionEn > vironment > import org.apache.flink.streaming.api.scala > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > import org.apache.flink.table.api.TableEnvironment > import org.apache.flink.table.api.scala._ > import org.apache.flink.api.scala._ > import org.apache.kafka.clients.consumer.ConsumerConfig > import org.apache.kafka.clients.consumer.ConsumerRecord > import org.apache.kafka.clients.consumer.KafkaConsumer > import org.apache.flink.core.fs.FileSystem > import org.apache.flink.streaming.api.TimeCharacteristic > import org.slf4j.LoggerFactory > import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, > FlinkKafkaProducer011} > import java.util.Calendar > import java.util.Date > import java.text.DateFormat > import java.text.SimpleDateFormat > import org.apache.log4j.Logger > import org.apache.log4j.Level > import sys.process.stringSeqToProcess > import java.io.File > > And this is the simple code > > val properties = new Properties() > properties.setProperty("bootstrap.servers", bootstrapServers) > properties.setProperty("zookeeper.connect", zookeeperConnect) > properties.setProperty("group.id", flinkAppName) > properties.setProperty("auto.offset.reset", "latest") > val streamExecEnv = StreamExecutionEnvironment.get > ExecutionEnvironment > val dataStream = streamExecEnv > .addSource(new FlinkKafkaConsumer011[String](topicsValue, new > SimpleStringSchema(), properties)) > val tableEnv = TableEnvironment.getTableEnvironment(dataStream) > tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker, > 'timeissued, 'price) > > And this is the compilation error > > info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ > streaming/target/scala-2.11/classes... > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138: > overloaded method value getTableEnvironment with alternatives: > [error] (executionEnvironment: org.apache.flink.streaming.api > .scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment > <and> > [error] (executionEnvironment: org.apache.flink.streaming.api > .environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment > <and> > [error] (executionEnvironment: org.apache.flink.api.scala.Exe > cutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment > <and> > [error] (executionEnvironment: org.apache.flink.api.java.Exec > utionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment > [error] cannot be applied to (org.apache.flink.streaming.ap > i.datastream.DataStreamSource[String]) > [error] val tableEnv = TableEnvironment.getTableEnvironment(dataStream) > [error] ^ > [error] one error found > [error] (compile:compileIncremental) Compilation failed > [error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM > Completed compiling > > which is really strange > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 1 Aug 2018 at 13:42, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi I think you are mixing Java and Scala dependencies. >> >> org.apache.flink.streaming.api.datastream.DataStream is the DataStream >> of the Java DataStream API. >> You should use the DataStream of the Scala DataStream API. >> >> Best, Fabian >> >> 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: >> >>> Hi, >>> >>> I believed I tried Hequn's suggestion and tried again >>> >>> import org.apache.flink.table.api.Table >>> import org.apache.flink.table.api.TableEnvironment >>> >>> *import org.apache.flink.table.api.scala._* >>> Unfortunately I am still getting the same error! >>> >>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ >>> streaming/target/scala-2.11/classes... >>> [error] >>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151: >>> overloaded method value fromDataStream with alternatives: >>> [error] [T](dataStream: >>> org.apache.flink.streaming.api.datastream.DataStream[T], >>> fields: String)org.apache.flink.table.api.Table <and> >>> [error] [T](dataStream: org.apache.flink.streaming.api >>> .datastream.DataStream[T])org.apache.flink.table.api.Table >>> [error] cannot be applied to (org.apache.flink.streaming.ap >>> i.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) >>> [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, >>> 'ticker, 'timeissued, 'price) >>> [error] ^ >>> [error] one error found >>> [error] (compile:compileIncremental) Compilation failed >>> [error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM >>> Completed compiling >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Wed, 1 Aug 2018 at 10:03, Timo Walther <twal...@apache.org> wrote: >>> >>>> If these two imports are the only imports that you added, then you did >>>> not follow Hequn's advice or the link that I sent you. >>>> >>>> You need to add the underscore imports to let Scala do its magic. >>>> >>>> Timo >>>> >>>> >>>> Am 01.08.18 um 10:28 schrieb Mich Talebzadeh: >>>> >>>> Hi Timo, >>>> >>>> These are my two flink table related imports >>>> >>>> import org.apache.flink.table.api.Table >>>> import org.apache.flink.table.api.TableEnvironment >>>> >>>> And these are my dependencies building with SBT >>>> >>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" >>>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" >>>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" >>>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" >>>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" >>>> libraryDependencies += "org.apache.flink" %% >>>> "flink-connector-kafka-0.11" % "1.5.0" >>>> libraryDependencies += "org.apache.flink" %% >>>> "flink-connector-kafka-base" % "1.5.0" >>>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" >>>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" >>>> libraryDependencies += "org.apache.flink" %% "flink-streaming-java" % >>>> "1.5.0" % "provided" >>>> >>>> *libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % >>>> "provided" *libraryDependencies += "org.apache.kafka" %% "kafka" % >>>> "0.11.0.0" >>>> >>>> There appears to be conflict somewhere that cause this error >>>> >>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ >>>> streaming/target/scala-2.11/classes... >>>> [error] >>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: >>>> overloaded method value fromDataStream with alternatives: >>>> [error] [T](dataStream: >>>> org.apache.flink.streaming.api.datastream.DataStream[T], >>>> fields: String)org.apache.flink.table.api.Table <and> >>>> [error] [T](dataStream: org.apache.flink.streaming.api >>>> .datastream.DataStream[T])org.apache.flink.table.api.Table >>>> [error] cannot be applied to (org.apache.flink.streaming.ap >>>> i.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) >>>> [error] val table1: Table = tableEnv.fromDataStream(dataStream, >>>> 'key, 'ticker, 'timeissued, 'price) >>>> [error] ^ >>>> [error] one error found >>>> [error] (compile:compileIncremental) Compilation failed >>>> >>>> Thanks >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Wed, 1 Aug 2018 at 09:17, Timo Walther <twal...@apache.org> wrote: >>>> >>>>> Hi Mich, >>>>> >>>>> I would check you imports again [1]. This is a pure compiler issue >>>>> that is unrelated to your actual data stream. Also check your project >>>>> dependencies. >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/ >>>>> table/common.html#implicit-conversion-for-scala >>>>> >>>>> Am 01.08.18 um 09:30 schrieb Mich Talebzadeh: >>>>> >>>>> >>>>> Hi both, >>>>> >>>>> I added the import as Hequn suggested. >>>>> >>>>> My stream is very simple and consists of 4 values separated by "," as >>>>> below >>>>> >>>>> 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48 >>>>> >>>>> So this is what I have been trying to do >>>>> >>>>> Code >>>>> >>>>> val dataStream = streamExecEnv >>>>> .addSource(new FlinkKafkaConsumer011[String](topicsValue, new >>>>> SimpleStringSchema(), properties)) >>>>> // >>>>> // >>>>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >>>>> val table1: Table = tableEnv.fromDataStream(dataStream, 'key, >>>>> 'ticker, 'timeissued, 'price) >>>>> >>>>> note those four columns in Table1 definition >>>>> >>>>> And this is the error being thrown >>>>> >>>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_ >>>>> streaming/target/scala-2.11/classes... >>>>> [error] >>>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: >>>>> overloaded method value fromDataStream with alternatives: >>>>> [error] [T](dataStream: >>>>> org.apache.flink.streaming.api.datastream.DataStream[T], >>>>> fields: String)org.apache.flink.table.api.Table <and> >>>>> [error] [T](dataStream: org.apache.flink.streaming.api >>>>> .datastream.DataStream[T])org.apache.flink.table.api.Table >>>>> [error] cannot be applied to (org.apache.flink.streaming.ap >>>>> i.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) >>>>> [error] val table1: Table = tableEnv.fromDataStream(dataStream, >>>>> 'key, 'ticker, 'timeissued, 'price) >>>>> [error] ^ >>>>> [error] one error found >>>>> [error] (compile:compileIncremental) Compilation failed >>>>> >>>>> I suspect dataStream may not be compatible with this operation? >>>>> >>>>> Regards, >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, 1 Aug 2018 at 04:51, Hequn Cheng <chenghe...@gmail.com> wrote: >>>>> >>>>>> Hi, Mich >>>>>> >>>>>> You can try adding "import org.apache.flink.table.api.scala._", so >>>>>> that the Symbol can be recognized as an Expression. >>>>>> >>>>>> Best, Hequn >>>>>> >>>>>> On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am following this example >>>>>>> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ >>>>>>> dev/table/common.html#integration-with-datastream-and-dataset-api >>>>>>> >>>>>>> This is my dataStream which is built on a Kafka topic >>>>>>> >>>>>>> // >>>>>>> //Create a Kafka consumer >>>>>>> // >>>>>>> val dataStream = streamExecEnv >>>>>>> .addSource(new FlinkKafkaConsumer011[String](topicsValue, >>>>>>> new SimpleStringSchema(), properties)) >>>>>>> // >>>>>>> // >>>>>>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >>>>>>> val table1: Table = tableEnv.fromDataStream(dataStream, 'key, >>>>>>> 'ticker, 'timeissued, 'price) >>>>>>> >>>>>>> While compiling it throws this error >>>>>>> >>>>>>> [error] >>>>>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169: >>>>>>> overloaded method value fromDataStream with alternatives: >>>>>>> [error] [T](dataStream: >>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T], >>>>>>> fields: String)org.apache.flink.table.api.Table <and> >>>>>>> [error] [T](dataStream: org.apache.flink.streaming.api >>>>>>> .datastream.DataStream[T])org.apache.flink.table.api.Table >>>>>>> [error] cannot be applied to (org.apache.flink.streaming.ap >>>>>>> i.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, >>>>>>> Symbol) >>>>>>> [error] val table1: Table = tableEnv.fromDataStream(dataStream, >>>>>>> 'key, 'ticker, 'timeissued, 'price) >>>>>>> [error] ^ >>>>>>> [error] one error found >>>>>>> [error] (compile:compileIncremental) Compilation failed >>>>>>> >>>>>>> The topic is very simple, it is comma separated prices. I tried >>>>>>> mapFunction and flatMap but neither worked! >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>