The relevant dependencies are val flinkScala = "org.apache.flink" %% "flink-scala" % flinkVersion % "provided" val flinkStreamingScala = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided" val flinkKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion exclude("org.slf4j", "slf4j-log4j12") I am using SBT I tried both connector-kafka and connector-kaka-11 - same result
Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > On Feb 21, 2019, at 1:38 AM, Konstantin Knauf <konstan...@ververica.com> > wrote: > > Hi Boris, > > can you the relevant parts (dependencies) of your pom.xml? Did you also try > without fixing the Kafka version, i.e. running with the Kafka client version > provided by the Kafka connector of Flink? Gordon (cc) dealt with FLINK-8741. > > @Gordon: have you seen this issue with 1.6/1.7 before? > > Cheers, > > Konstantin > > On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky > <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> wrote: > I found some more details on this > The same error for the same application was reported about a year ago > http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CCAE7GCT4pF74LwyY=tivzhquq50tkjjawfhaw+5phcsx+vos...@mail.gmail.com%3E > > <http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CCAE7GCT4pF74LwyY=tivzhquq50tkjjawfhaw+5phcsx+vos...@mail.gmail.com%3E> > And was due to https://issues.apache.org/jira/browse/FLINK-8741 > <https://issues.apache.org/jira/browse/FLINK-8741> > > It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both > latest kaffka-connector > And Kafka-connector-011 > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> > https://www.lightbend.com/ <https://www.lightbend.com/> >> On Feb 19, 2019, at 7:02 PM, Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>> wrote: >> >> Hi Boris, >> >> I haven’t seen this exact error, but I have seen similar errors caused by >> multiple versions of jars on the classpath. >> >> When I’ve run into this particular "XXX is not an instance of YYY" problem, >> it often seems to be caused by a jar that I should have marked as provided >> in my pom. >> >> Though I’m typically running on a YARN cluster, not w/K8s, so maybe this >> doesn’t apply. >> >> — Ken >> >> PS - I assume you’ve been reading >> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html >> >> <https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html> >> >> >>> On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <boris.lublin...@lightbend.com >>> <mailto:boris.lublin...@lightbend.com>> wrote: >>> >>> Konstantin, >>> After experimenting with this for a while, I got to the root cause of the >>> problem >>> I am running a version of a Taxi ride travel prediction as my sample. >>> It works fine in Intellij, >>> But when I am trying to put it in the docker (standard Debian 1.7 image) >>> It fails with a following error >>> >>> >>> The program finished with the following exception: >>> >>> org.apache.flink.client.program.ProgramInvocationException: Job failed. >>> (JobID: 9340e7669e7344ab827fef4ddb5ba73d) >>> at >>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) >>> at >>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>> at >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>> at >>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89) >>> at >>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) >>> at >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) >>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) >>> at >>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>> execution failed. >>> at >>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>> at >>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) >>> ... 19 more >>> Caused by: org.apache.kafka.common.KafkaException: Failed to construct >>> kafka producer >>> at >>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416) >>> at >>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288) >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) >>> at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384) >>> at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847) >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.kafka.common.KafkaException: >>> org.apache.kafka.common.serialization.ByteArraySerializer is not an >>> instance of org.apache.kafka.common.serialization.Serializer >>> at >>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) >>> at >>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327) >>> ... 17 more >>> >>> The closest that I found >>> https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no >>> >>> <https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no> >>> Which talks about class loader. (I tried there solution, but it did not >>> help) >>> I looked at the loading and I see that the pair of these 2 classes is >>> loaded from my uber jar, but twice. >>> >>> Have you guys seen this error before? >>> Any suggestion? >>> >>> Boris Lublinsky >>> FDP Architect >>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >>> https://www.lightbend.com/ <https://www.lightbend.com/> >>>> On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <konstan...@ververica.com >>>> <mailto:konstan...@ververica.com>> wrote: >>>> >>>> Hi Boris, >>>> >>>> without looking at the entrypoint in much detail, generally there should >>>> not be a race condition there: >>>> >>>> * if the taskmanagers can not connect to the resourcemanager they will >>>> retry (per default the timeout is 5 mins) >>>> * if the JobManager does not get enough resources from the ResourceManager >>>> it will also wait for the resources/slots to provided. The timeout there >>>> is also 5 minutes, I think. >>>> >>>> So, this should actually be pretty robust as long as the Taskmanager >>>> containers can reach the Jobmanager eventually. >>>> >>>> Could you provide the Taskmanager/JobManager logs for such a failure case? >>>> >>>> Cheers, >>>> >>>> Konstantin >>>> >>>> >>>> On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky >>>> <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> >>>> wrote: >>>> Following >>>> https://github.com/apache/flink/tree/release-1.7/flink-container/docker >>>> <https://github.com/apache/flink/tree/release-1.7/flink-container/docker> >>>> I have created an entry point, which looks like follows: >>>> #!/bin/sh >>>> >>>> ################################################################################ >>>> # from >>>> https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh >>>> >>>> <https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh> >>>> # and >>>> https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh >>>> >>>> <https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh> >>>> ################################################################################ >>>> >>>> # If unspecified, the hostname of the container is taken as the JobManager >>>> address >>>> JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} >>>> >>>> drop_privs_cmd() { >>>> if [ $(id -u) != 0 ]; then >>>> # Don't need to drop privs if EUID != 0 >>>> return >>>> elif [ -x /sbin/su-exec ]; then >>>> # Alpine >>>> echo su-exec flink >>>> else >>>> # Others >>>> echo gosu flink >>>> fi >>>> } >>>> >>>> JOB_MANAGER="jobmanager" >>>> TASK_MANAGER="taskmanager" >>>> >>>> CMD="$1" >>>> shift >>>> >>>> if [ "${CMD}" = "help" ]; then >>>> echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)" >>>> exit 0 >>>> elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then >>>> if [ "${CMD}" = "${TASK_MANAGER}" ]; then >>>> >>>> TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep >>>> -c ^processor /proc/cpuinfo)} >>>> >>>> sed -i -e "s/jobmanager.rpc.address: >>>> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" >>>> "$FLINK_HOME/conf/flink-conf.yaml" >>>> sed -i -e "s/taskmanager.numberOfTaskSlots: >>>> 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" >>>> "$FLINK_HOME/conf/flink-conf.yaml" >>>> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml" >>>> echo "query.server.port: 6125" >> >>>> "$FLINK_HOME/conf/flink-conf.yaml" >>>> >>>> echo "Starting Task Manager" >>>> echo "config file: " && grep '^[^\n#]' >>>> "$FLINK_HOME/conf/flink-conf.yaml" >>>> exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" >>>> start-foreground >>>> else >>>> sed -i -e "s/jobmanager.rpc.address: >>>> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" >>>> "$FLINK_HOME/conf/flink-conf.yaml" >>>> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml" >>>> echo "query.server.port: 6125" >> >>>> "$FLINK_HOME/conf/flink-conf.yaml" >>>> echo "config file: " && grep '^[^\n#]' >>>> "$FLINK_HOME/conf/flink-conf.yaml" >>>> >>>> if [ -z "$1" ]; then >>>> exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" >>>> start-foreground "$@" >>>> else >>>> exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@" >>>> fi >>>> fi >>>> fi >>>> >>>> exec "$@" >>>> It does work for all the cases, except running standalone job. >>>> The problem, the way I understand it, is a racing condition. >>>> In kubernetes it takes several attempts for establish connection between >>>> Job and Task manager, while standalone-job.sh >>>> tries to start a job immediately once the cluster is created (before >>>> connection is established). >>>> Is there a better option to implement it starting a job on container >>>> startup? >>>> >>>> >>>> >>>> -- >>>> Konstantin Knauf | Solutions Architect >>>> +49 160 91394525 >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra > > > > -- > Konstantin Knauf | Solutions Architect > +49 160 91394525 > <https://www.ververica.com/> > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen