Hi, This is the code
import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; object md_streaming { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "rhes75:9092") properties.setProperty("zookeeper.connect", "rhes75:2181") properties.setProperty("group.id", "md_streaming") val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("/tmp/md_streaming.txt") env.execute("Flink Kafka Example") } and this is the sbt dependencies libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0" libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 2 Jul 2018 at 17:45, Ted Yu <yuzhih...@gmail.com> wrote: > Here is the signature of assign : > > public void assign(Collection<TopicPartition> partitions) { > > Looks like RestClusterClient was built against one version of Kafka but > runs against a different version. > > Please check the sbt dependency and the version of Kafka jar on the > classpath. > > Thanks > > On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh <mich.talebza...@gmail.com > > wrote: > >> Have you seen this error by any chance in flink streaming with Kafka >> please? >> >> org.apache.flink.client.program.ProgramInvocationException: >> java.lang.NoSuchMethodError: >> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >> at >> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >> at md_streaming$.main(md_streaming.scala:30) >> at md_streaming.main(md_streaming.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) >> at >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) >> at >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) >> Caused by: java.lang.NoSuchMethodError: >> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) >> >> >> thanks >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> > >