Hi Boris, I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.
When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom. Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply. — Ken PS - I assume you’ve been reading https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html <https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html> > On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <boris.lublin...@lightbend.com> > wrote: > > Konstantin, > After experimenting with this for a while, I got to the root cause of the > problem > I am running a version of a Taxi ride travel prediction as my sample. > It works fine in Intellij, > But when I am trying to put it in the docker (standard Debian 1.7 image) > It fails with a following error > > > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: 9340e7669e7344ab827fef4ddb5ba73d) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at > com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89) > at > com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 19 more > Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka > producer > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.serialization.ByteArraySerializer is not an instance > of org.apache.kafka.common.serialization.Serializer > at > org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327) > ... 17 more > > The closest that I found > https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no > > <https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no> > Which talks about class loader. (I tried there solution, but it did not help) > I looked at the loading and I see that the pair of these 2 classes is loaded > from my uber jar, but twice. > > Have you guys seen this error before? > Any suggestion? > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> > https://www.lightbend.com/ > >> On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <konstan...@ververica.com >> <mailto:konstan...@ververica.com>> wrote: >> >> Hi Boris, >> >> without looking at the entrypoint in much detail, generally there should not >> be a race condition there: >> >> * if the taskmanagers can not connect to the resourcemanager they will retry >> (per default the timeout is 5 mins) >> * if the JobManager does not get enough resources from the ResourceManager >> it will also wait for the resources/slots to provided. The timeout there is >> also 5 minutes, I think. >> >> So, this should actually be pretty robust as long as the Taskmanager >> containers can reach the Jobmanager eventually. >> >> Could you provide the Taskmanager/JobManager logs for such a failure case? >> >> Cheers, >> >> Konstantin >> >> >> On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky >> <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> wrote: >> Following >> https://github.com/apache/flink/tree/release-1.7/flink-container/docker >> <https://github.com/apache/flink/tree/release-1.7/flink-container/docker> >> I have created an entry point, which looks like follows: >> #!/bin/sh >> >> ################################################################################ >> # from >> https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh >> >> <https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh> >> # and >> https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh >> >> <https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh> >> ################################################################################ >> >> # If unspecified, the hostname of the container is taken as the JobManager >> address >> JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} >> >> drop_privs_cmd() { >> if [ $(id -u) != 0 ]; then >> # Don't need to drop privs if EUID != 0 >> return >> elif [ -x /sbin/su-exec ]; then >> # Alpine >> echo su-exec flink >> else >> # Others >> echo gosu flink >> fi >> } >> >> JOB_MANAGER="jobmanager" >> TASK_MANAGER="taskmanager" >> >> CMD="$1" >> shift >> >> if [ "${CMD}" = "help" ]; then >> echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)" >> exit 0 >> elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then >> if [ "${CMD}" = "${TASK_MANAGER}" ]; then >> >> TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep >> -c ^processor /proc/cpuinfo)} >> >> sed -i -e "s/jobmanager.rpc.address: >> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" >> "$FLINK_HOME/conf/flink-conf.yaml" >> sed -i -e "s/taskmanager.numberOfTaskSlots: >> 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" >> "$FLINK_HOME/conf/flink-conf.yaml" >> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml" >> echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml" >> >> echo "Starting Task Manager" >> echo "config file: " && grep '^[^\n#]' >> "$FLINK_HOME/conf/flink-conf.yaml" >> exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" >> start-foreground >> else >> sed -i -e "s/jobmanager.rpc.address: >> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" >> "$FLINK_HOME/conf/flink-conf.yaml" >> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml" >> echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml" >> echo "config file: " && grep '^[^\n#]' >> "$FLINK_HOME/conf/flink-conf.yaml" >> >> if [ -z "$1" ]; then >> exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" >> start-foreground "$@" >> else >> exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@" >> fi >> fi >> fi >> >> exec "$@" >> It does work for all the cases, except running standalone job. >> The problem, the way I understand it, is a racing condition. >> In kubernetes it takes several attempts for establish connection between Job >> and Task manager, while standalone-job.sh >> tries to start a job immediately once the cluster is created (before >> connection is established). >> Is there a better option to implement it starting a job on container startup? >> >> >> >> -- >> Konstantin Knauf | Solutions Architect >> +49 160 91394525 -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra