The relevant dependencies are
val flinkScala = "org.apache.flink" %%
"flink-scala" % flinkVersion % "provided"
val flinkStreamingScala = "org.apache.flink" %%
"flink-streaming-scala" % flinkVersion % "provided"
val flinkKafka = "org.apache.flink" %%
"flink-connector-kafka" % flinkVersion exclude("org.slf4j",
"slf4j-log4j12")
I am using SBT
I tried both connector-kafka and connector-kaka-11 - same result
Boris Lublinsky
FDP Architect
[email protected]
https://www.lightbend.com/
> On Feb 21, 2019, at 1:38 AM, Konstantin Knauf <[email protected]>
> wrote:
>
> Hi Boris,
>
> can you the relevant parts (dependencies) of your pom.xml? Did you also try
> without fixing the Kafka version, i.e. running with the Kafka client version
> provided by the Kafka connector of Flink? Gordon (cc) dealt with FLINK-8741.
>
> @Gordon: have you seen this issue with 1.6/1.7 before?
>
> Cheers,
>
> Konstantin
>
> On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky
> <[email protected] <mailto:[email protected]>> wrote:
> I found some more details on this
> The same error for the same application was reported about a year ago
> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CCAE7GCT4pF74LwyY=tivzhquq50tkjjawfhaw+5phcsx+vos...@mail.gmail.com%3E
>
> <http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CCAE7GCT4pF74LwyY=tivzhquq50tkjjawfhaw+5phcsx+vos...@mail.gmail.com%3E>
> And was due to https://issues.apache.org/jira/browse/FLINK-8741
> <https://issues.apache.org/jira/browse/FLINK-8741>
>
> It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both
> latest kaffka-connector
> And Kafka-connector-011
>
> Boris Lublinsky
> FDP Architect
> [email protected] <mailto:[email protected]>
> https://www.lightbend.com/ <https://www.lightbend.com/>
>> On Feb 19, 2019, at 7:02 PM, Ken Krugler <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi Boris,
>>
>> I haven’t seen this exact error, but I have seen similar errors caused by
>> multiple versions of jars on the classpath.
>>
>> When I’ve run into this particular "XXX is not an instance of YYY" problem,
>> it often seems to be caused by a jar that I should have marked as provided
>> in my pom.
>>
>> Though I’m typically running on a YARN cluster, not w/K8s, so maybe this
>> doesn’t apply.
>>
>> — Ken
>>
>> PS - I assume you’ve been reading
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>
>> <https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html>
>>
>>
>>> On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Konstantin,
>>> After experimenting with this for a while, I got to the root cause of the
>>> problem
>>> I am running a version of a Taxi ride travel prediction as my sample.
>>> It works fine in Intellij,
>>> But when I am trying to put it in the docker (standard Debian 1.7 image)
>>> It fails with a following error
>>>
>>>
>>> The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>>> (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>> at
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>> at
>>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
>>> at
>>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>>> ... 19 more
>>> Caused by: org.apache.kafka.common.KafkaException: Failed to construct
>>> kafka producer
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.kafka.common.KafkaException:
>>> org.apache.kafka.common.serialization.ByteArraySerializer is not an
>>> instance of org.apache.kafka.common.serialization.Serializer
>>> at
>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
>>> ... 17 more
>>>
>>> The closest that I found
>>> https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no
>>>
>>> <https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no>
>>> Which talks about class loader. (I tried there solution, but it did not
>>> help)
>>> I looked at the loading and I see that the pair of these 2 classes is
>>> loaded from my uber jar, but twice.
>>>
>>> Have you guys seen this error before?
>>> Any suggestion?
>>>
>>> Boris Lublinsky
>>> FDP Architect
>>> [email protected] <mailto:[email protected]>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>>> On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[email protected]
>>>> <mailto:[email protected]>> wrote:
>>>>
>>>> Hi Boris,
>>>>
>>>> without looking at the entrypoint in much detail, generally there should
>>>> not be a race condition there:
>>>>
>>>> * if the taskmanagers can not connect to the resourcemanager they will
>>>> retry (per default the timeout is 5 mins)
>>>> * if the JobManager does not get enough resources from the ResourceManager
>>>> it will also wait for the resources/slots to provided. The timeout there
>>>> is also 5 minutes, I think.
>>>>
>>>> So, this should actually be pretty robust as long as the Taskmanager
>>>> containers can reach the Jobmanager eventually.
>>>>
>>>> Could you provide the Taskmanager/JobManager logs for such a failure case?
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>>
>>>> On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky
>>>> <[email protected] <mailto:[email protected]>>
>>>> wrote:
>>>> Following
>>>> https://github.com/apache/flink/tree/release-1.7/flink-container/docker
>>>> <https://github.com/apache/flink/tree/release-1.7/flink-container/docker>
>>>> I have created an entry point, which looks like follows:
>>>> #!/bin/sh
>>>>
>>>> ################################################################################
>>>> # from
>>>> https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
>>>>
>>>> <https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh>
>>>> # and
>>>> https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
>>>>
>>>> <https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh>
>>>> ################################################################################
>>>>
>>>> # If unspecified, the hostname of the container is taken as the JobManager
>>>> address
>>>> JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
>>>>
>>>> drop_privs_cmd() {
>>>> if [ $(id -u) != 0 ]; then
>>>> # Don't need to drop privs if EUID != 0
>>>> return
>>>> elif [ -x /sbin/su-exec ]; then
>>>> # Alpine
>>>> echo su-exec flink
>>>> else
>>>> # Others
>>>> echo gosu flink
>>>> fi
>>>> }
>>>>
>>>> JOB_MANAGER="jobmanager"
>>>> TASK_MANAGER="taskmanager"
>>>>
>>>> CMD="$1"
>>>> shift
>>>>
>>>> if [ "${CMD}" = "help" ]; then
>>>> echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
>>>> exit 0
>>>> elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
>>>> if [ "${CMD}" = "${TASK_MANAGER}" ]; then
>>>>
>>>> TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep
>>>> -c ^processor /proc/cpuinfo)}
>>>>
>>>> sed -i -e "s/jobmanager.rpc.address:
>>>> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g"
>>>> "$FLINK_HOME/conf/flink-conf.yaml"
>>>> sed -i -e "s/taskmanager.numberOfTaskSlots:
>>>> 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g"
>>>> "$FLINK_HOME/conf/flink-conf.yaml"
>>>> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
>>>> echo "query.server.port: 6125" >>
>>>> "$FLINK_HOME/conf/flink-conf.yaml"
>>>>
>>>> echo "Starting Task Manager"
>>>> echo "config file: " && grep '^[^\n#]'
>>>> "$FLINK_HOME/conf/flink-conf.yaml"
>>>> exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh"
>>>> start-foreground
>>>> else
>>>> sed -i -e "s/jobmanager.rpc.address:
>>>> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g"
>>>> "$FLINK_HOME/conf/flink-conf.yaml"
>>>> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
>>>> echo "query.server.port: 6125" >>
>>>> "$FLINK_HOME/conf/flink-conf.yaml"
>>>> echo "config file: " && grep '^[^\n#]'
>>>> "$FLINK_HOME/conf/flink-conf.yaml"
>>>>
>>>> if [ -z "$1" ]; then
>>>> exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh"
>>>> start-foreground "$@"
>>>> else
>>>> exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
>>>> fi
>>>> fi
>>>> fi
>>>>
>>>> exec "$@"
>>>> It does work for all the cases, except running standalone job.
>>>> The problem, the way I understand it, is a racing condition.
>>>> In kubernetes it takes several attempts for establish connection between
>>>> Job and Task manager, while standalone-job.sh
>>>> tries to start a job immediately once the cluster is created (before
>>>> connection is established).
>>>> Is there a better option to implement it starting a job on container
>>>> startup?
>>>>
>>>>
>>>>
>>>> --
>>>> Konstantin Knauf | Solutions Architect
>>>> +49 160 91394525
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>
>
>
> --
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
> <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen