*The question is cross-posted on Stack
Overflow 
https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
<https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2>.
Viewing the question on Stack Overflow is preferred as I include a few
images for better description.*

Hi community,

## Flink (Scala) exitCode=2

I have a simple Flink job that reads from 2 columns of a Hive table
`mysource`, add up the columns, then writes the result to another Hive
table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
and `mysink` has only 1 column `c bigint`.

The job submits successfully, however, I observe it keeps retrying.

[![enter image description here][1]][1]

I click into each attempt, they simply show this.

```
AM Container for appattempt_1607399514900_2511_001267 exited with exitCode:
2
For more detailed output, check application tracking page:
http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e13_1607399514900_2511_1267_000001
Exit code: 2
Stack trace: ExitCodeException exitCode=2:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 2
Failing this attempt
```

However, the "Logs" has no useful info - it complains about the logging
lib, but I believe they are really warnings, not errors.

```
LogType:jobmanager.err
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:1010
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger
(org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
End of LogType:jobmanager.err

LogType:jobmanager.out
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:0
Log Contents:
End of LogType:jobmanager.out
```

This is the job written in Scala.

```scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object HiveToyExample {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance.build
    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(execEnv, settings)

    val hiveCatalog = new HiveCatalog(
      "myhive",
      "aiinfra",
      "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
    )
    tableEnv.registerCatalog("myhive", hiveCatalog)
    tableEnv.useCatalog("myhive")

    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

    tableEnv
      .executeSql("""
          |INSERT INTO mysink
          |SELECT a + b
          |FROM mysource
          |""".stripMargin)
  }
}
```

Here's the pom.xml.

```xml
<project xmlns="http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
<modelVersion>4.0.0</modelVersion>

<groupId>exmple</groupId>
<artifactId>featurepipelines</artifactId>
<version>0.1.1</version>
<packaging>jar</packaging>

<name>Feature Pipelines</name>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<shadedClassifierName>Shade</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

This is how I package the jar.

```
mvn clean package
```

This is how I run the job.

```
flink run \
--yarnname scalaflink-hive-test \
-m yarn-cluster \
-yarnqueue datadev \
--class featurepipelines.ingestion.HiveToyExample \
./featurepipelines-0.1.1.jar
```

## PyFlink rewrite works just fine?!

Since the logic is so simple, I rewrite the job with PyFlink to see what
happens. Here shows the PyFlink rewrite.

```python
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().use_blink_planner().build()
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=settings)

# There exists such a jar in the path
t_env.get_config().get_configuration().set_string(
    "pipeline.jars",
f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog(catalog_name, hive_catalog)
t_env.use_catalog(catalog_name)

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

This is how I run the PyFlink job.

```
flink run \
--yarnname pyflink-hive-test \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py
```

Surprisingly, the job runs fine - it finishes soon, with result written to
the `mysink` table.

## Why?

Given the comparison, I highly doubt the Scala job fails because it is not
packaged correctly, even though I follow [Flink Docs](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven),
which can be verified by looking at my pom.

> If you are building your own program, you need the following dependencies
in your mvn file. It’s recommended not to include these dependencies in the
resulting jar file. You’re supposed to add dependencies as stated above at
runtime.

```
<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>
```

Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in
/lib of my flink distribution, as suggested in [Flink docs](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
):

> the recommended way to add dependency is to use a bundled jar. Separate
jars should be used only if bundled jars don’t meet your needs.

What do I miss?

Best,
Yik San

  [1]: https://i.stack.imgur.com/fBsHS.png
  [2]: https://i.stack.imgur.com/ilNtr.png

Reply via email to