[ 
https://issues.apache.org/jira/browse/FLINK-24031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-24031:
-------------------------------------
    Labels:   (was: flink kubernetes)

> I am trying to deploy Flink in kubernetes but when I launch the taskManager 
> in other container I get a Exception
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24031
>                 URL: https://issues.apache.org/jira/browse/FLINK-24031
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes
>    Affects Versions: 1.13.0, 1.13.2
>            Reporter: Julio Pérez
>            Priority: Major
>             Fix For: 1.13.1
>
>
>  I explain here -> https://github.com/apache/flink/pull/17020
> I have a problem when I try to run Flink in k8s with the follow manifests
> h3. JobManager
>  apiVersion: v1
>  kind: Service
>  metadata:
>  name: jobmanager-cs
>  spec:
>  type: NodePort
>  ports:
>  - name: ui
>  port: 8081
>  selector: 
>  app: flink
>  component: jobmanager
>  —
>  apiVersion: v1
>  kind: Service
>  metadata:
>  name: jobmanager-hs
>  spec:
>  type: ClusterIP
>  ports:
>  - port: 6123
>  name: rpc
>  - port: 6124
>  name: blob-server
>  - port: 6125
>  name: query
>  selector: 
>  app: flink
>  component: jobmanager
>  —
>  apiVersion: apps/v1
>  kind: Deployment
>  metadata:
>  name: flink-jobmanager
>  spec:
>  selector:
>  matchLabels:
>  app: flink
>  template:
>  metadata:
>  labels:
>  app: flink
>  component: jobmanager
>  spec:
>  restartPolicy: Always
>  containers:
>  - name: jobmanager
>  image: flink:1.13.1-scala_2.12
>  command: [bash,"-ec",bin/jobmanager.sh start-foreground cluster]
>  resources:
>  limits:
>  memory: "2024Mi"
>  cpu: "500m"
>  env:
>  - name: JOB_MANAGER_ID
>  valueFrom:
>  fieldRef:
>  apiVersion: v1
>  fieldPath: status.podIP
>  - name: POD_IP
>  valueFrom:
>  fieldRef:
>  apiVersion: v1
>  fieldPath: status.podIP
>  # The following args overwrite the value of jobmanager.rpc.address 
> configured in the configuration config map to POD_IP.
>  args: ["standalone-job", "--host", "$POD_IP", "--job-classname", 
> "org.apache.flink.application.Main"] #, <optional arguments>, <job 
> arguments>] optional arguments: ["--job-id", "<job id>", "--fromSavepoint", 
> "/path/to/savepoint", "--allowNonRestoredState"]
>  ports:
>  - containerPort: 6123
>  name: rpc
>  - containerPort: 6124
>  name: blob-server
>  - containerPort: 6125
>  name: query
>  - containerPort: 8081
>  name: webui
>  volumeMounts:
>  - name: flink-config-volume
>  mountPath: /opt/flink/conf
>  - name: job-artifacts-volume
>  mountPath: /opt/flink/usrlib
>  securityContext:
>  runAsUser: 9999 
>  volumes:
>  - name: flink-config-volume
>  configMap:
>  name: flink-config
>  items:
>  - key: flink-conf.yaml
>  path: flink-conf.yaml
>  - key: log4j-console.properties
>  path: log4j-console.properties
>  - name: job-artifacts-volume
>  hostPath:
>  path: /config/flink}}
> h3. Task Manager
>  {{apiVersion: apps/v1
>  kind: Deployment
>  metadata:
>  name: flink-taskmanager
>  spec:
>  replicas: 2
>  selector:
>  matchLabels:
>  app: flink
>  component: taskmanager
>  template:
>  metadata:
>  labels:
>  app: flink
>  component: taskmanager
>  spec:
>  containers:
>  - name: taskmanager
>  image: flink:1.13.1-scala_2.12
>  env:
>  - name: K8S_POD_IP
>  valueFrom:
>  fieldRef:
>  fieldPath: status.podIP
>  command: ["/bin/sh", "-ec", "sleep 1000"]
>  resources:
>  limits:
>  memory: "800Mi"
>  cpu: "2000m"
>  args: ["taskmanager","start-foreground","-Dtaskmanager.host=$K8S_POD_IP"]
>  ports:
>  - containerPort: 6122
>  name: rpc
>  - containerPort: 6125
>  name: query-state
>  volumeMounts:
>  - name: flink-config-volume
>  mountPath: /opt/flink/conf/
>  - name: job-artifacts-volume
>  mountPath: /opt/flink/usrlib
>  securityContext:
>  runAsUser: 9999 
>  volumes:
>  - name: flink-config-volume
>  configMap:
>  name: flink-config
>  items:
>  - key: flink-conf.yaml
>  path: flink-conf.yaml
>  - key: log4j-console.properties
>  path: log4j-console.properties
>  - name: job-artifacts-volume
>  hostPath:
>  path: /config/flink}}
> h3. ConfigMap
>  {{apiVersion: v1
>  kind: ConfigMap
>  metadata:
>  name: flink-config
>  labels:
>  app: flink
>  data:
>  flink-conf.yaml: |+
>  jobmanager.rpc.address: jobmanager-hs
>  taskmanager.numberOfTaskSlots: 1
>  blob.server.port: 6124
>  jobmanager.rpc.port: 6123
>  taskmanager.rpc.port: 6122
>  taskmanager.heap.size: 1024m
>  jobmanager.heap.size: 1024m
>  state.backend: filesystem
>  s3.access-key: k8sdemo
>  s3.secret-key: k8sdemo123
>  state.checkpoints.dir: /opt/flink/usrlib/checkpoints
>  state.savepoints.dir: /opt/flink/usrlib/savepoints
>  metrics.reporters: prom
>  metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
>  metrics.reporter.promport: 9249
>  queryable-state.proxy.ports: 6125
>  jobmanager.memory.process.size: 1600m
>  taskmanager.memory.process.size: 1728m
>  parallelism.default: 1
>  rest.address: 0.0.0.0
>  rest.bind-address: 0.0.0.0
>  jobmanager.execution.failover-strategy: region
> log4j-console.properties: |+
>  # This affects logging for both user code and Flink
>  rootLogger.level = DEBUG
>  rootLogger.appenderRef.console.ref = ConsoleAppender
>  rootLogger.appenderRef.rolling.ref = RollingFileAppender
>  # Uncomment this if you want to _only_ change Flink's logging
>  #logger.flink.name = org.apache.flink
>  #logger.flink.level = DEBUG
>  # The following lines keep the log level of common libraries/connectors on
>  # log level INFO. The root logger does not override this. You have to 
> manually
>  # change the log levels here.
>  logger.akka.name = akka
>  logger.akka.level = DEBUG
>  logger.kafka.name= org.apache.kafka
>  logger.kafka.level = DEBUG
>  logger.hadoop.name = org.apache.hadoop
>  logger.hadoop.level = DEBUG
>  logger.zookeeper.name = org.apache.zookeeper
>  logger.zookeeper.level = DEBUG
>  # Log all infos to the console
>  appender.console.name = ConsoleAppender
>  appender.console.type = CONSOLE
>  appender.console.layout.type = PatternLayout
>  appender.console.layout.pattern = %d\{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
> - %m%n
>  # Log all infos in the given rolling file
>  appender.rolling.name = RollingFileAppender
>  appender.rolling.type = RollingFile
>  appender.rolling.append = false
>  appender.rolling.fileName = ${sys:log.file}
>  appender.rolling.filePattern = ${sys:log.file}.%i
>  appender.rolling.layout.type = PatternLayout
>  appender.rolling.layout.pattern = %d\{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
> - %m%n
>  appender.rolling.policies.type = Policies
>  appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
>  appender.rolling.policies.size.size=100MB
>  appender.rolling.strategy.type = DefaultRolloverStrategy
>  appender.rolling.strategy.max = 10
>  # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>  logger.netty.name = 
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
>  logger.netty.level = OFF }}
>  and when i try to run the Task Manager with the follow command
> {quote}bin/taskmanager start-foreground -Dtaskmanager.host=$K8S_POD_IP
> {quote}
> I have the following exception
>  JobManager :
> {quote}2021-08-27 09:16:57,917 ERROR akka.remote.EndpointWriter [] - dropping 
> message [class akka.actor.ActorSelectionMessage] for non-local recipient 
> [Actor[akka.tcp://flink@jobmanager-hs:6123/]] arriving at 
> [akka.tcp://flink@jobmanager-hs:6123] inbound addresses are 
> [akka.tcp://flink@cluster:6123]
>  2021-08-27 09:17:01,255 DEBUG 
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
> Trigger heartbeat request.
>  2021-08-27 09:17:01,284 DEBUG 
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
> Trigger heartbeat request.
>  2021-08-27 09:17:10,008 DEBUG akka.remote.transport.netty.NettyTransport [] 
> - Remote connection to [/172.17.0.1:34827] was disconnected because of [id: 
> 0x13ae1d03, /172.17.0.1:34827 :> /172.17.0.23:6123] DISCONNECTED
>  2021-08-27 09:17:10,008 DEBUG akka.remote.transport.ProtocolStateActor [] - 
> Association between local [tcp://flink@cluster:6123] and remote 
> [tcp://flink@172.17.0.1:34827] was disassociated because the 
> ProtocolStateActor failed: Unknown
>  2021-08-27 09:17:10,009 WARN akka.remote.ReliableDeliverySupervisor [] - 
> Association with remote system [akka.tcp://flink@172.17.0.24:6122] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> {quote}
> TaskManager:
> {quote}INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager__, retrying 
> in 10000 ms: Could not connect to rpc endpoint under address 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager__.
>  INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not 
> resolve ResourceManager address 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager__, retrying 
> in 10000 ms: Could not connect to rpc endpoint under address 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager__.
> {quote}
> Best regards,
> Julio



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to