Hi Fabian. Thanks. Great contribution! It is working
info] SHA-1: 98d78b909631e5d30664df6a7a4a3f421d4fd33b [info] Packaging /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/md_streaming-assembly-1.0.jar ... [info] Done packaging. [success] Total time: 14 s, completed Jul 3, 2018 9:32:25 AM Completed compiling Tue Jul 3 09:32:25 BST 2018 , Running in **Standalone mode** *Starting execution of program* And the test prices generated in the topic are added to the file for each security ad534c19-fb77-4966-86a8-dc411d7a0607,MKS,2018-07-03T09:50:03,319.2 e96e5d96-03fc-4603-899f-5ea5151fc280,IBM,2018-07-03T09:50:07,124.67 a64a51a4-f27c-439b-b9cc-28dc593acab3,MRW,2018-07-03T09:50:07,286.27 2d15089d-3635-4a7e-b2d5-20b92dd67186,MSFT,2018-07-03T09:50:07,22.8 415d1215-18e6-46ca-a771-98c614e8c3fb,ORCL,2018-07-03T09:50:07,32.57 e0dd5832-20bd-4951-a4dd-3a3a10c99a01,SAP,2018-07-03T09:50:07,69.32 4222eea9-d9a7-46e1-8b1e-c2b634170fad,SBRY,2018-07-03T09:50:07,235.22 4f2e0927-29ff-44ff-aa2e-9b16fdcd0024,TSCO,2018-07-03T09:50:07,403.64 49437f8b-5e2b-42e9-b3d7-f95eee9c5432,VOD,2018-07-03T09:50:07,239.08 0f96463e-40a5-47c5-b2c6-7191992ab0b1,BP,2018-07-03T09:50:07,587.75 d0041bf1-a313-4623-a7cc-2ce8204590bb,MKS,2018-07-03T09:50:07,406.02 c443ac53-f762-4fad-b11c-0fd4e98812fb,IBM,2018-07-03T09:50:10,168.52 67f2d372-f918-445e-8dac-7556a2dfd0aa,MRW,2018-07-03T09:50:10,293.2 57372392-53fd-48eb-aa94-c317c75d6545,MSFT,2018-07-03T09:50:10,46.53 c3839c12-be63-416c-a404-8d0333071559,ORCL,2018-07-03T09:50:10,31.57 29eca46c-bd4c-475e-a9c9-7bf105fcc935,SAP,2018-07-03T09:50:10,77.81 89f98ad0-dc34-476f-baa5-fc2fa92aa2d5,SBRY,2018-07-03T09:50:10,239.95 431494f3-1215-48ae-a534-5bf3fbe20f2f,TSCO,2018-07-03T09:50:10,331.12 2203095f-8826-424d-a1e3-fa212194ac35,VOD,2018-07-03T09:50:10,232.05 816ddc9b-f403-4ea9-8d55-c3afd0eae110,BP,2018-07-03T09:50:10,506.4 23c07878-d64d-4d1e-84a4-c14c23357467,MKS,2018-07-03T09:50:10,473.06 kind regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 3 Jul 2018 at 09:11, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Mich, > > FlinkKafkaConsumer09 is the connector for Kafka 0.9.x. > Have you tried to use FlinkKafkaConsumer011 instead of > FlinkKafkaConsumer09? > > Best, Fabian > > > > 2018-07-02 22:57 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: > >> This is becoming very tedious. >> >> As suggested I changed the kafka dependency from >> >> ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" >> to >> >> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" >> >> and compiled and ran the job again anf failed. This is the log file >> >> 2018-07-02 21:38:38,656 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> version : 1.1.0 >> 2018-07-02 21:38:38,656 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> commitId : fdcf75ea326b8e07 >> 2018-07-02 21:38:38,696 INFO >> org.apache.kafka.clients.Metadata - Cluster ID: >> 3SqEt4DcTruOr_SlQ6fqTQ >> 2018-07-02 21:38:38,698 INFO >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> Consumer subtask 0 will start reading the following 1 partitions from the >> committed group offsets in Kafka: [KafkaTopicPartition{topic='md', >> partition=0}] >> 2018-07-02 21:38:38,702 INFO >> org.apache.kafka.clients.consumer.ConsumerConfig - >> ConsumerConfig values: >> auto.commit.interval.ms = 5000 >> auto.offset.reset = latest >> bootstrap.servers = [rhes75:9092] >> check.crcs = true >> client.id = >> connections.max.idle.ms = 540000 >> enable.auto.commit = true >> exclude.internal.topics = true >> fetch.max.bytes = 52428800 >> fetch.max.wait.ms = 500 >> fetch.min.bytes = 1 >> group.id = md_streaming >> heartbeat.interval.ms = 3000 >> interceptor.classes = [] >> internal.leave.group.on.close = true >> isolation.level = read_uncommitted >> key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> max.partition.fetch.bytes = 1048576 >> max.poll.interval.ms = 300000 >> max.poll.records = 500 >> metadata.max.age.ms = 300000 >> metric.reporters = [] >> metrics.num.samples = 2 >> metrics.recording.level = INFO >> metrics.sample.window.ms = 30000 >> partition.assignment.strategy = [class >> org.apache.kafka.clients.consumer.RangeAssignor] >> receive.buffer.bytes = 65536 >> reconnect.backoff.max.ms = 1000 >> reconnect.backoff.ms = 50 >> request.timeout.ms = 305000 >> retry.backoff.ms = 100 >> sasl.jaas.config = null >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.min.time.before.relogin = 60000 >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> sasl.mechanism = GSSAPI >> security.protocol = PLAINTEXT >> send.buffer.bytes = 131072 >> session.timeout.ms = 10000 >> ssl.cipher.suites = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.endpoint.identification.algorithm = null >> ssl.key.password = null >> ssl.keymanager.algorithm = SunX509 >> ssl.keystore.location = null >> ssl.keystore.password = null >> ssl.keystore.type = JKS >> ssl.protocol = TLS >> ssl.provider = null >> ssl.secure.random.implementation = null >> ssl.trustmanager.algorithm = PKIX >> ssl.truststore.location = null >> ssl.truststore.password = null >> ssl.truststore.type = JKS >> value.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> 2018-07-02 21:38:38,705 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> version : 1.1.0 >> 2018-07-02 21:38:38,705 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> commitId : fdcf75ea326b8e07 >> 2018-07-02 21:38:38,705 WARN >> org.apache.kafka.common.utils.AppInfoParser - Error >> registering AppInfo mbean >> javax.management.InstanceAlreadyExistsException: >> kafka.consumer:type=app-info,id=consumer-2 >> at >> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) >> at >> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) >> at >> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:785) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:171) >> 2018-07-02 21:38:38,709 WARN >> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - >> Error while closing Kafka consumer >> java.lang.NullPointerException >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:286) >> 2018-07-02 21:38:38,710 INFO >> org.apache.flink.runtime.taskmanager.Task - Source: >> Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b) >> switched from RUNNING to FAILED. >> java.lang.NoSuchMethodError: >> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) >> 2018-07-02 21:38:38,713 INFO >> org.apache.flink.runtime.taskmanager.Task - Freeing >> task resources for Source: Custom Source -> Sink: Unnamed (1/1) >> (bcb46879e709768c9160dd11e09ba05b). >> 2018-07-02 21:38:38,713 INFO >> org.apache.flink.runtime.taskmanager.Task - Ensuring >> all FileSystem streams are closed for task Source: Custom Source -> Sink: >> Unnamed (1/1) (bcb46879e709768c9160d >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Mon, 2 Jul 2018 at 20:59, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> From flink-connector-kafka-0.11 dependency, we know the version of >>> Kafka used is (flink-connectors/flink-connector-kafka-0.11/pom.xml): >>> >>> <kafka.version>0.11.0.2</kafka.version> >>> From Kafka side, you specified 1.1.0 >>> >>> I think these versions produce what you experienced. >>> If you use Kafka 0.11, this problem should go away. >>> >>> On Mon, Jul 2, 2018 at 11:24 AM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> This is the code >>>> >>>> import java.util.Properties >>>> import java.util.Arrays >>>> import org.apache.flink.api.common.functions.MapFunction >>>> import org.apache.flink.api.java.utils.ParameterTool >>>> import org.apache.flink.streaming.api.datastream.DataStream >>>> import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 >>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >>>> import >>>> org.apache.flink.streaming.util.serialization.DeserializationSchema >>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >>>> import org.apache.kafka.clients.consumer.ConsumerConfig; >>>> import org.apache.kafka.clients.consumer.ConsumerRecord; >>>> import org.apache.kafka.clients.consumer.ConsumerRecords; >>>> import org.apache.kafka.clients.consumer.KafkaConsumer; >>>> >>>> object md_streaming >>>> { >>>> def main(args: Array[String]) >>>> { >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> val properties = new Properties() >>>> properties.setProperty("bootstrap.servers", "rhes75:9092") >>>> properties.setProperty("zookeeper.connect", "rhes75:2181") >>>> properties.setProperty("group.id", "md_streaming") >>>> val stream = env >>>> .addSource(new FlinkKafkaConsumer09[String]("md", new >>>> SimpleStringSchema(), properties)) >>>> .writeAsText("/tmp/md_streaming.txt") >>>> env.execute("Flink Kafka Example") >>>> } >>>> >>>> and this is the sbt dependencies >>>> >>>> libraryDependencies += "org.apache.flink" %% >>>> "flink-connector-kafka-0.11" % "1.5.0" >>>> libraryDependencies += "org.apache.flink" %% >>>> "flink-connector-kafka-base" % "1.5.0" >>>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" >>>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0" >>>> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % >>>> "1.5.0" >>>> libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" >>>> >>>> >>>> Thanks >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Mon, 2 Jul 2018 at 17:45, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> Here is the signature of assign : >>>>> >>>>> public void assign(Collection<TopicPartition> partitions) { >>>>> >>>>> Looks like RestClusterClient was built against one version of Kafka >>>>> but runs against a different version. >>>>> >>>>> Please check the sbt dependency and the version of Kafka jar on the >>>>> classpath. >>>>> >>>>> Thanks >>>>> >>>>> On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> Have you seen this error by any chance in flink streaming with Kafka >>>>>> please? >>>>>> >>>>>> org.apache.flink.client.program.ProgramInvocationException: >>>>>> java.lang.NoSuchMethodError: >>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >>>>>> at >>>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) >>>>>> at >>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >>>>>> at >>>>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>>>>> at md_streaming$.main(md_streaming.scala:30) >>>>>> at md_streaming.main(md_streaming.scala) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >>>>>> at >>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >>>>>> at >>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) >>>>>> at >>>>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) >>>>>> at >>>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) >>>>>> at >>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) >>>>>> at >>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) >>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>>> at >>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) >>>>>> at >>>>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>>>> at >>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) >>>>>> Caused by: java.lang.NoSuchMethodError: >>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) >>>>>> >>>>>> >>>>>> thanks >>>>>> >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>> >