fangmin.yang created FLINK-36574: ------------------------------------ Summary: The file STDOUT is not available on the TaskExecutor Key: FLINK-36574 URL: https://issues.apache.org/jira/browse/FLINK-36574 Project: Flink Issue Type: Bug Reporter: fangmin.yang
I started flink with the following docker-entrypoint.sh from the official flink repository {code:java} #!/usr/bin/env bash############################################################################### # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ###############################################################################COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE_DIR="${FLINK_HOME}/conf"drop_privs_cmd() { if [ $(id -u) != 0 ]; then # Don't need to drop privs if EUID != 0 return elif [ -x /sbin/su-exec ]; then # Alpine echo su-exec flink else # Others echo gosu flink fi }copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar} mkdir -p "${FLINK_HOME}/plugins/${plugin_name}" if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then echo "Plugin ${target_plugin} does not exist. Exiting." exit 1 else ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}" echo "Successfully enabled ${target_plugin}" fi done }set_config_options() { local config_parser_script="$FLINK_HOME/bin/config-parser-utils.sh" local config_dir="$FLINK_HOME/conf" local bin_dir="$FLINK_HOME/bin" local lib_dir="$FLINK_HOME/lib" local config_params=() while [ $# -gt 0 ]; do local key="$1" local value="$2" config_params+=("-D${key}=${value}") shift 2 done if [ "${#config_params[@]}" -gt 0 ]; then "${config_parser_script}" "${config_dir}" "${bin_dir}" "${lib_dir}" "${config_params[@]}" fi }prepare_configuration() { local config_options=() config_options+=("jobmanager.rpc.address" "${JOB_MANAGER_RPC_ADDRESS}") config_options+=("blob.server.port" "6124") config_options+=("query.server.port" "6125") if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then config_options+=("taskmanager.numberOfTaskSlots" "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}") fi if [ ${#config_options[@]} -ne 0 ]; then set_config_options "${config_options[@]}" fi if [ -n "${FLINK_PROPERTIES}" ]; then process_flink_properties "${FLINK_PROPERTIES}" fi }process_flink_properties() { local flink_properties_content=$1 local config_options=() local OLD_IFS="$IFS" IFS=$'\n' for prop in $flink_properties_content; do prop=$(echo $prop | tr -d '[:space:]') if [ -z "$prop" ]; then continue fi IFS=':' read -r key value <<< "$prop" value=$(echo $value | envsubst) config_options+=("$key" "$value") done IFS="$OLD_IFS" if [ ${#config_options[@]} -ne 0 ]; then set_config_options "${config_options[@]}" fi }maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." fi fi }maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" printf " Or $(basename "$0") help\n\n" printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}") echo "Starting History Server" exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}") echo "Starting Task Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" fiargs=("${args[@]}")# Running command in pass-through mode exec $(drop_privs_cmd) "${args[@]}" {code} Here are the startup commands {code:java} docker run --rm --name=jobmanager --network flink-network --publish 8081:8081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:v1 jobmanager {code} {code:java} docker run --rm --name=taskmanager --network flink-network --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:v1 taskmanager {code} When I started successfully, But when I run a word count program. the task manager STDOUT the following exceptions: The file STDOUT does not exist on the TaskExecutor. If you are using kubernetes mode, please use "kubectl logs <pod-name>" to get stdout content. !https://i.postimg.cc/Qtvr6fm1/20241020205050.png! https://i.postimg.cc/Qtvr6fm1/20241020205050.png -- This message was sent by Atlassian Jira (v8.20.10#820010)