Hi Mich,
How did you setup your local Kafka cluster, did you produce any message to
it? Seems like you are using a standard local Kafka cluster setup for
testing:
"bootstrap.servers", "localhost:9092" "zookeeper.connect", "localhost:2181"
so probably you need to manually produce some data, probably
using kafka-console-producer [1]
Another thing is since you are executing it in the scala shell, it might be
easier for you to do
val stream = env
.addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
.writeAsText("some_file_path")
so that the produced result won't get buried in a huge list of console
output messages.
--
Rong
[1]: https://kafka.apache.org/quickstart#quickstart_send
On Sat, Jun 30, 2018 at 8:06 AM zhangminglei <[email protected]> wrote:
> Please try new FlinkKafkaConsumer09[String]("md", new
> SimpleStringSchema(), properties).setStartFromEarliest() and try again.
>
> Cheers
> Minglei.
>
>
> 在 2018年6月30日,下午10:08,Mich Talebzadeh <[email protected]> 写道:
>
>
> Hi,
>
> I have a streaming topic called "md" that displays test market data.
>
> I have written a simple program to stream data in via kafka into flinl.
>
> Flink version 1.5
> Kafka version 2.12
>
> This is the sample program in scala that compiles ok
> in start-scala-shell.sh
>
> import java.util.Properties
> import java.util.Arrays
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.datastream.DataStream
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.streaming.util.serialization.DeserializationSchema
> //import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> object Main {
> def main(args: Array[String]) {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092")
> properties.setProperty("zookeeper.connect", "localhost:2181")
> properties.setProperty("group.id", "sampleScala")
> val stream = env
> .addSource(new FlinkKafkaConsumer09[String]("md", new
> SimpleStringSchema(), properties))
> .print()
> env.execute("Flink Kafka Example")
> }
> }
>
> warning: there was one deprecation warning; re-run with -deprecation for
> details
> defined object Main
>
> But I do not see any streaming output.
>
> A naïve question. How do I execute the above compiled object in this shell?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>