Hi,

You have to pass the StreamExecutionEnvironment to the
getTableEnvironment() method, not the DataStream (or DataStreamSource).
Change

val tableEnv = TableEnvironment.getTableEnvironment(dataStream)

to

val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)

Best,
Fabian

2018-08-02 0:10 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:

> Hi,
>
> FYI, these are my imports
>
> import java.util.Properties
> import java.util.Arrays
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment
> import org.apache.flink.streaming.api.scala
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients.consumer.ConsumerRecord
> import org.apache.kafka.clients.consumer.KafkaConsumer
> import org.apache.flink.core.fs.FileSystem
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.slf4j.LoggerFactory
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
> FlinkKafkaProducer011}
> import java.util.Calendar
> import java.util.Date
> import java.text.DateFormat
> import java.text.SimpleDateFormat
> import org.apache.log4j.Logger
> import org.apache.log4j.Level
> import sys.process.stringSeqToProcess
> import java.io.File
>
> And this is the simple code
>
>     val properties = new Properties()
>     properties.setProperty("bootstrap.servers", bootstrapServers)
>     properties.setProperty("zookeeper.connect", zookeeperConnect)
>     properties.setProperty("group.id", flinkAppName)
>     properties.setProperty("auto.offset.reset", "latest")
>     val  streamExecEnv = StreamExecutionEnvironment.get
> ExecutionEnvironment
>     val dataStream =  streamExecEnv
>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>   tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker,
> 'timeissued, 'price)
>
> And this is the compilation error
>
> info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
> streaming/target/scala-2.11/classes...
> [error] 
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
> overloaded method value getTableEnvironment with alternatives:
> [error]   (executionEnvironment: org.apache.flink.streaming.api
> .scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment
> <and>
> [error]   (executionEnvironment: org.apache.flink.streaming.api
> .environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment
> <and>
> [error]   (executionEnvironment: org.apache.flink.api.scala.Exe
> cutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment
> <and>
> [error]   (executionEnvironment: org.apache.flink.api.java.Exec
> utionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment
> [error]  cannot be applied to (org.apache.flink.streaming.ap
> i.datastream.DataStreamSource[String])
> [error]   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
> [error]                                   ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM
> Completed compiling
>
> which is really strange
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 1 Aug 2018 at 13:42, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi I think you are mixing Java and Scala dependencies.
>>
>> org.apache.flink.streaming.api.datastream.DataStream is the DataStream
>> of the Java DataStream API.
>> You should use the DataStream of the Scala DataStream API.
>>
>> Best, Fabian
>>
>> 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>
>>> Hi,
>>>
>>> I believed I tried Hequn's suggestion and tried again
>>>
>>> import org.apache.flink.table.api.Table
>>> import org.apache.flink.table.api.TableEnvironment
>>>
>>> *import org.apache.flink.table.api.scala._*
>>> Unfortunately I am still getting the same error!
>>>
>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>>> streaming/target/scala-2.11/classes...
>>> [error] 
>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151:
>>> overloaded method value fromDataStream with alternatives:
>>> [error]   [T](dataStream: 
>>> org.apache.flink.streaming.api.datastream.DataStream[T],
>>> fields: String)org.apache.flink.table.api.Table <and>
>>> [error]   [T](dataStream: org.apache.flink.streaming.api
>>> .datastream.DataStream[T])org.apache.flink.table.api.Table
>>> [error]  cannot be applied to (org.apache.flink.streaming.ap
>>> i.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol)
>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
>>> 'ticker, 'timeissued, 'price)
>>> [error]                                ^
>>> [error] one error found
>>> [error] (compile:compileIncremental) Compilation failed
>>> [error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM
>>> Completed compiling
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 1 Aug 2018 at 10:03, Timo Walther <twal...@apache.org> wrote:
>>>
>>>> If these two imports are the only imports that you added, then you did
>>>> not follow Hequn's advice or the link that I sent you.
>>>>
>>>> You need to add the underscore imports to let Scala do its magic.
>>>>
>>>> Timo
>>>>
>>>>
>>>> Am 01.08.18 um 10:28 schrieb Mich Talebzadeh:
>>>>
>>>> Hi Timo,
>>>>
>>>> These are my two flink table related imports
>>>>
>>>> import org.apache.flink.table.api.Table
>>>> import org.apache.flink.table.api.TableEnvironment
>>>>
>>>> And these are my dependencies building with SBT
>>>>
>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
>>>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
>>>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
>>>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
>>>> libraryDependencies += "org.apache.flink" %%
>>>> "flink-connector-kafka-0.11" % "1.5.0"
>>>> libraryDependencies += "org.apache.flink" %%
>>>> "flink-connector-kafka-base" % "1.5.0"
>>>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
>>>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
>>>> libraryDependencies += "org.apache.flink" %% "flink-streaming-java" %
>>>> "1.5.0" % "provided"
>>>>
>>>> *libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
>>>> "provided" *libraryDependencies += "org.apache.kafka" %% "kafka" %
>>>> "0.11.0.0"
>>>>
>>>> There appears to be conflict somewhere that cause this error
>>>>
>>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>>>> streaming/target/scala-2.11/classes...
>>>> [error] 
>>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
>>>> overloaded method value fromDataStream with alternatives:
>>>> [error]   [T](dataStream: 
>>>> org.apache.flink.streaming.api.datastream.DataStream[T],
>>>> fields: String)org.apache.flink.table.api.Table <and>
>>>> [error]   [T](dataStream: org.apache.flink.streaming.api
>>>> .datastream.DataStream[T])org.apache.flink.table.api.Table
>>>> [error]  cannot be applied to (org.apache.flink.streaming.ap
>>>> i.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol)
>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream,
>>>> 'key, 'ticker, 'timeissued, 'price)
>>>> [error]                                ^
>>>> [error] one error found
>>>> [error] (compile:compileIncremental) Compilation failed
>>>>
>>>> Thanks
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 1 Aug 2018 at 09:17, Timo Walther <twal...@apache.org> wrote:
>>>>
>>>>> Hi Mich,
>>>>>
>>>>> I would check you imports again [1]. This is a pure compiler issue
>>>>> that is unrelated to your actual data stream. Also check your project
>>>>> dependencies.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/
>>>>> table/common.html#implicit-conversion-for-scala
>>>>>
>>>>> Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:
>>>>>
>>>>>
>>>>> Hi both,
>>>>>
>>>>> I added the import as Hequn suggested.
>>>>>
>>>>> My stream is very simple and consists of 4 values separated by "," as
>>>>> below
>>>>>
>>>>> 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
>>>>>
>>>>> So this is what I have been trying to do
>>>>>
>>>>> Code
>>>>>
>>>>>     val dataStream =  streamExecEnv
>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>>>> SimpleStringSchema(), properties))
>>>>>  //
>>>>>  //
>>>>>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>>>>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
>>>>> 'ticker, 'timeissued, 'price)
>>>>>
>>>>> note those four columns in Table1 definition
>>>>>
>>>>> And this is the error being thrown
>>>>>
>>>>> [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_
>>>>> streaming/target/scala-2.11/classes...
>>>>> [error] 
>>>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
>>>>> overloaded method value fromDataStream with alternatives:
>>>>> [error]   [T](dataStream: 
>>>>> org.apache.flink.streaming.api.datastream.DataStream[T],
>>>>> fields: String)org.apache.flink.table.api.Table <and>
>>>>> [error]   [T](dataStream: org.apache.flink.streaming.api
>>>>> .datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>> [error]  cannot be applied to (org.apache.flink.streaming.ap
>>>>> i.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol)
>>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream,
>>>>> 'key, 'ticker, 'timeissued, 'price)
>>>>> [error]                                ^
>>>>> [error] one error found
>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>>
>>>>> I suspect dataStream may not be compatible with this operation?
>>>>>
>>>>> Regards,
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 1 Aug 2018 at 04:51, Hequn Cheng <chenghe...@gmail.com> wrote:
>>>>>
>>>>>> Hi, Mich
>>>>>>
>>>>>> You can try adding "import org.apache.flink.table.api.scala._", so
>>>>>> that the Symbol can be recognized as an Expression.
>>>>>>
>>>>>> Best, Hequn
>>>>>>
>>>>>> On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am following this example
>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/
>>>>>>> dev/table/common.html#integration-with-datastream-and-dataset-api
>>>>>>>
>>>>>>> This is my dataStream which is built on a Kafka topic
>>>>>>>
>>>>>>>    //
>>>>>>>     //Create a Kafka consumer
>>>>>>>     //
>>>>>>>     val dataStream =  streamExecEnv
>>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue,
>>>>>>> new SimpleStringSchema(), properties))
>>>>>>>  //
>>>>>>>  //
>>>>>>>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>>>>>>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
>>>>>>> 'ticker, 'timeissued, 'price)
>>>>>>>
>>>>>>> While compiling it throws this error
>>>>>>>
>>>>>>> [error] 
>>>>>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169:
>>>>>>> overloaded method value fromDataStream with alternatives:
>>>>>>> [error]   [T](dataStream: 
>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T],
>>>>>>> fields: String)org.apache.flink.table.api.Table <and>
>>>>>>> [error]   [T](dataStream: org.apache.flink.streaming.api
>>>>>>> .datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>>>> [error]  cannot be applied to (org.apache.flink.streaming.ap
>>>>>>> i.datastream.DataStreamSource[String], Symbol, Symbol, Symbol,
>>>>>>> Symbol)
>>>>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream,
>>>>>>> 'key, 'ticker, 'timeissued, 'price)
>>>>>>> [error]                                ^
>>>>>>> [error] one error found
>>>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>>>>
>>>>>>> The topic is very simple, it is comma separated prices. I tried
>>>>>>> mapFunction and flatMap but neither worked!
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>

Reply via email to