Hi All, After successfully writing the wordcount program, I was trying to create a streaming application but is getting below error when submitting the job in local mode.
Vishnus-MacBook-Pro:flink vishnu$ flink run target/scala-2.11/flink-vishnu_2.11-1.0.jar java.lang.NoSuchMethodError: org.apache.flink.streaming.api.scala.DataStream.flatMap(Lscala/Function1;Lorg/apache/flink/api/common/typeinfo/TypeInformation;)Lorg/apache/flink/streaming/api/scala/DataStream; at com.vishnu.flink.streaming.FlinkStreamingWordCount$.main(FlinkStreamingWordCount.scala:14) at com.vishnu.flink.streaming.FlinkStreamingWordCount.main(FlinkStreamingWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) at org.apache.flink.client.program.Client.runBlocking(Client.java:252) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028) The exception above occurred while trying to run your command. This is my FlinkStreamingWordCount.scala file package com.vishnu.flink.streaming import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ object FlinkStreamingWordCount { def main(args: Array[String]) { val sev = StreamExecutionEnvironment.getExecutionEnvironment val socTxtStream = sev.socketTextStream("localhost",4444) val counts = socTxtStream.flatMap(line => line.split(" ") ) .map { (_, 1) } .keyBy(0) .sum(1) counts.print() sev.execute() } } This is how my sbt file looks like val flink = "org.apache.flink" % "flink-core" % "1.0.0" val flinkclients = "org.apache.flink" % "flink-clients_2.11" % "1.0.0" val flinkstreaming = "org.apache.flink" % "flink-streaming-scala_2.11" % "1.0.0" val main = "com.vishnu.flink.streaming.FlinkStreamingWordCount" name := "flink-vishnu" mainClass in (Compile, run) := Some(main) mainClass in (Compile, packageBin) := Some(main) lazy val commonSettings = Seq( organization := "com.vishnu", version := "1.0", scalaVersion := "2.11.7" ) lazy val root = (project in file(".")). settings(commonSettings:_*). settings( name := "flink-vishnu", libraryDependencies += flink, libraryDependencies += flinkclients, libraryDependencies += flinkstreaming, retrieveManaged := true ) I m using scala 2.11.7, and have downloaded Flink for scala 2.11 Any help is appreciated Thanks, Vishnu