Hi,

I came across an issue during job submission via Flink Cli Client with Flink 
1.7.1 in high availability mode.

Setup:
Flink version:: 1.7.1
Cluster:: K8s
Mode:: High availability with 2 jobmanagers

CLI Command
./bin/flink run -d -c MyExample /myexample.jar
The CLI runs inside a K8s job and submits the Flink job to the Flink cluster. 
The K8s job spec allows it to try 3 times to submit the job.

Result:
2019-09-11 22:32:12.908 [Flink-RestClusterClient-IO-thread-4] level=DEBUG 
org.apache.flink.runtime.rest.RestClient  - Sending request of class class 
org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody to 
job-jm-1.job-jm-svc.job-namespace.svc.cluster.local:8081/v1/jobs
2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR 
org.apache.flink.runtime.rest.RestClient  - Response was not valid JSON.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
 No content to map due to end-of-input
at [Source: 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@2b88f8bb; 
line: 1, column: 0]
      at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:256)
      at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3851)
      at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
      at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272)
      at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:504)
      at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452)
             ………
2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR 
org.apache.flink.runtime.rest.RestClient  - Unexpected plain-text response:
……..

The job submission fails after exhausting the number of retries.

Observations:
I looked into the debug logs & Flink code to come to below conclusions –

  *   CLI rest client received an empty response body from the jobmanager 
(job-jm-1). I think the response was a redirect and the RestClient class does 
not handle redirects. This explains the above exception from Jackson and 
missing response body logged in “org.apache.flink.runtime.rest.RestClient  - 
Unexpected plain-text response:” logs above.
  *   The ZooKeeperLeaderRetrievalService in the rest client logs that job-jm-1 
became leader followed by a log that job-jm-0 became leader. The address of 
job-jm-1 is http and address of job-jm-0 is akka url. CLI logs at end of email.
  *   The RestClusterClient class does not update the leader during the job 
submission if the leader changes.
  *   All the 3 times the CLI K8s job tried to submit the Flink job, 
ZooKeeperLeaderRetrievalService finds both the events of job-jm-1 becoming the 
leader followed by job-jm-0. So all the 3 retries fails to submit the job with 
same reason of empty response.
  *   The jobmanager logs from both job-jm-0 and job-jm-1 shows that job-jm-0 
is the leader and job-jm-1 was never a leader. This contradicts the CLI logs.

Open questions:

  *   I am not sure why the CLI’s ZooKeeperLeaderRetrievalService thinks 
job-jm-1 was the leader whereas the both jobmanager’s 
ZooKeeperLeaderRetrievalService considers job-jm-0 as the leader throughout the 
cluster lifetime.
  *   Even if CLI’s ZooKeeperLeaderRetrievalService thinks leader has changed 
from job-jm-1 to job-jm-0, it still uses job-jm-1. Is that a known issue with 
Flink 1.7.1 rest client that it doesn’t update the leader if it changed?
  *   Why one leader address is http while other is akka url?

Can someone help check and confirm my observations above and help answer the 
questions?

Highly appreciate your time and help.

~ Abhinav Bajaj


CLI Logs -

2019-09-11 22:30:31.077 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Leader node has 
changed.
2019-09-11 22:30:31.171 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader 
information: 
Leader=http://job-jm-1.job-jm-svc.job-namespace.svc.cluster.local:8081, session 
ID=c1422a1b-a6b8-43b0-85d7-87b95af16932.

……
2019-09-11 22:30:31.270 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Leader node has 
changed
2019-09-11 22:30:31.270 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader 
information: 
Leader=akka.tcp://fl...@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher,
 session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26



job-jm-0 Logs -
2019-09-11 22:29:59.781 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader 
information: 
Leader=akka.tcp://fl...@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/resourcemanager,
 session ID=e1f026b1-e368-4524-9fab-2e031423f74f.
2019-09-11 22:29:59.876 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader 
information: 
Leader=akka.tcp://fl...@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher,
 session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26.


job-jm-1 Logs -
2019-09-11 22:29:59.889 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader 
information: 
Leader=akka.tcp://fl...@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/resourcemanager,
 session ID=e1f026b1-e368-4524-9fab-2e031423f74f.
2019-09-11 22:29:59.976 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader 
information: 
Leader=akka.tcp://fl...@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher,
 session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26.





Reply via email to