Hi Yik San, to me it looks as if there is a problem with the job and the deployment. Unfortunately, the logging seems to not have worked. Could you check that you have a valid log4j.properties file in your conf directory.
Cheers, Till On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan <evan.chanyik...@gmail.com> wrote: > *The question is cross-posted on Stack > Overflow > https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2 > <https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2>. > Viewing the question on Stack Overflow is preferred as I include a few > images for better description.* > > Hi community, > > ## Flink (Scala) exitCode=2 > > I have a simple Flink job that reads from 2 columns of a Hive table > `mysource`, add up the columns, then writes the result to another Hive > table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`, > and `mysink` has only 1 column `c bigint`. > > The job submits successfully, however, I observe it keeps retrying. > > [![enter image description here][1]][1] > > I click into each attempt, they simply show this. > > ``` > AM Container for appattempt_1607399514900_2511_001267 exited with > exitCode: 2 > For more detailed output, check application tracking page: > http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then, > click on links to logs of each attempt. > Diagnostics: Exception from container-launch. > Container id: container_e13_1607399514900_2511_1267_000001 > Exit code: 2 > Stack trace: ExitCodeException exitCode=2: > at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) > at org.apache.hadoop.util.Shell.run(Shell.java:479) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) > at > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Container exited with a non-zero exit code 2 > Failing this attempt > ``` > > However, the "Logs" has no useful info - it complains about the logging > lib, but I believe they are really warnings, not errors. > > ``` > LogType:jobmanager.err > Log Upload Time:Wed Apr 07 10:30:52 +0800 2021 > LogLength:1010 > Log Contents: > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/ > SLF4J: Found binding in > [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > log4j:WARN No appenders could be found for logger > (org.apache.flink.runtime.entrypoint.ClusterEntrypoint). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for > more info. > End of LogType:jobmanager.err > > LogType:jobmanager.out > Log Upload Time:Wed Apr 07 10:30:52 +0800 2021 > LogLength:0 > Log Contents: > End of LogType:jobmanager.out > ``` > > This is the job written in Scala. > > ```scala > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect} > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.table.catalog.hive.HiveCatalog > > object HiveToyExample { > def main(args: Array[String]): Unit = { > val settings = EnvironmentSettings.newInstance.build > val execEnv = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(execEnv, settings) > > val hiveCatalog = new HiveCatalog( > "myhive", > "aiinfra", > "/data/apache/hive/apache-hive-2.1.0-bin/conf/" > ) > tableEnv.registerCatalog("myhive", hiveCatalog) > tableEnv.useCatalog("myhive") > > tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > > tableEnv > .executeSql(""" > |INSERT INTO mysink > |SELECT a + b > |FROM mysource > |""".stripMargin) > } > } > ``` > > Here's the pom.xml. > > ```xml > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > <modelVersion>4.0.0</modelVersion> > > <groupId>exmple</groupId> > <artifactId>featurepipelines</artifactId> > <version>0.1.1</version> > <packaging>jar</packaging> > > <name>Feature Pipelines</name> > > <properties> > <maven.compiler.source>8</maven.compiler.source> > <maven.compiler.target>8</maven.compiler.target> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > <flink.version>1.12.0</flink.version> > <scala.binary.version>2.11</scala.binary.version> > <scala.version>2.11.12</scala.version> > <log4j.version>2.12.1</log4j.version> > </properties> > > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-exec</artifactId> > <version>2.1.0</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>1.7.7</version> > </dependency> > </dependencies> > > <build> > <resources> > <resource> > <directory>src/main/resources</directory> > <filtering>true</filtering> > </resource> > </resources> > <plugins> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-shade-plugin</artifactId> > <version>3.2.4</version> > <executions> > <execution> > <phase>package</phase> > <goals> > <goal>shade</goal> > </goals> > <configuration> > <shadedArtifactAttached>false</shadedArtifactAttached> > <shadedClassifierName>Shade</shadedClassifierName> > <createDependencyReducedPom>false</createDependencyReducedPom> > <filters> > <filter> > <artifact>*:*</artifact> > <excludes> > <exclude>META-INF/*.SF</exclude> > <exclude>META-INF/*.DSA</exclude> > <exclude>META-INF/*.RSA</exclude> > </excludes> > </filter> > </filters> > </configuration> > </execution> > </executions> > </plugin> > <plugin> > <groupId>net.alchim31.maven</groupId> > <artifactId>scala-maven-plugin</artifactId> > <version>4.4.1</version> > <executions> > <execution> > <goals> > <goal>compile</goal> > <goal>testCompile</goal> > </goals> > </execution> > </executions> > </plugin> > </plugins> > </build> > </project> > ``` > > This is how I package the jar. > > ``` > mvn clean package > ``` > > This is how I run the job. > > ``` > flink run \ > --yarnname scalaflink-hive-test \ > -m yarn-cluster \ > -yarnqueue datadev \ > --class featurepipelines.ingestion.HiveToyExample \ > ./featurepipelines-0.1.1.jar > ``` > > ## PyFlink rewrite works just fine?! > > Since the logic is so simple, I rewrite the job with PyFlink to see what > happens. Here shows the PyFlink rewrite. > > ```python > import os > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import * > from pyflink.table.catalog import HiveCatalog > > settings = EnvironmentSettings.new_instance().use_blink_planner().build() > exec_env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(exec_env, > environment_settings=settings) > > # There exists such a jar in the path > t_env.get_config().get_configuration().set_string( > "pipeline.jars", > f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar" > ) > > catalog_name = "myhive" > default_database = "aiinfra" > hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/" > > hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir) > t_env.register_catalog(catalog_name, hive_catalog) > t_env.use_catalog(catalog_name) > > TRANSFORM_DML = """ > INSERT INTO mysink > SELECT a + b > FROM mysource > """ > > t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) > t_env.execute_sql(TRANSFORM_DML).wait() > ``` > > This is how I run the PyFlink job. > > ``` > flink run \ > --yarnname pyflink-hive-test \ > -m yarn-cluster \ > -yD yarn.application.queue=tech_platform \ > -pyarch pyflink1.12.0.zip \ > -pyexec /data/software/pyflink1.12.0/bin/python \ > -py /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py > ``` > > Surprisingly, the job runs fine - it finishes soon, with result written to > the `mysink` table. > > ## Why? > > Given the comparison, I highly doubt the Scala job fails because it is not > packaged correctly, even though I follow [Flink Docs]( > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven), > which can be verified by looking at my pom. > > > If you are building your own program, you need the following > dependencies in your mvn file. It’s recommended not to include these > dependencies in the resulting jar file. You’re supposed to add dependencies > as stated above at runtime. > > ``` > <!-- Flink Dependency --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-hive_2.11</artifactId> > <version>1.12.0</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_2.11</artifactId> > <version>1.12.0</version> > <scope>provided</scope> > </dependency> > > <!-- Hive Dependency --> > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-exec</artifactId> > <version>${hive.version}</version> > <scope>provided</scope> > </dependency> > ``` > > Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in > /lib of my flink distribution, as suggested in [Flink docs]( > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar > ): > > > the recommended way to add dependency is to use a bundled jar. Separate > jars should be used only if bundled jars don’t meet your needs. > > What do I miss? > > Best, > Yik San > > [1]: https://i.stack.imgur.com/fBsHS.png > [2]: https://i.stack.imgur.com/ilNtr.png >