Just to recap I use Flink 1.4.2 with Docker compose which launches a jobmanager and a taskmanager. My hope is to learn another library which can be used with Flink, so logging is important to me. I start the cluster and deploy the following task (I dropped all calls to that library so I can focus on plain Flink and docker)
public class ParallelMLExample { private static final Logger LOG = LoggerFactory.getLogger(ParallelMLExample.class); public static void main(String[] args) throws Exception { final ExecutionEnvironment env; //Set-up Flink session env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50); LOG.info("########## BEFORE UPDATEMODEL ##########"); List<Integer> collect = amounts.filter(a -> a > 30).reduce((integer, t1) -> integer + t1).collect(); LOG.info("########## AFTER UPDATEMODEL ##########"); LOG.info(collect.get(0).toString()); } } Log output of jobmanager does not show anything after "BEFORE UPDATE MODEL" $ docker-compose up Starting flink_jobmanager_1 Starting flink_taskmanager_1 Attaching to flink_jobmanager_1, flink_taskmanager_1 jobmanager_1 | Starting Job Manager jobmanager_1 | config file: jobmanager_1 | jobmanager.rpc.address: jobmanager jobmanager_1 | jobmanager.rpc.port: 6123 jobmanager_1 | jobmanager.heap.mb: 1024 jobmanager_1 | taskmanager.heap.mb: 1024 jobmanager_1 | taskmanager.numberOfTaskSlots: 1 jobmanager_1 | taskmanager.memory.preallocate: false jobmanager_1 | parallelism.default: 1 jobmanager_1 | web.port: 8081 jobmanager_1 | blob.server.port: 6124 jobmanager_1 | query.server.port: 6125 jobmanager_1 | blob.server.port: 6124 jobmanager_1 | query.server.port: 6125 jobmanager_1 | blob.server.port: 6124 jobmanager_1 | query.server.port: 6125 taskmanager_1 | Starting Task Manager taskmanager_1 | config file: taskmanager_1 | jobmanager.rpc.address: jobmanager taskmanager_1 | jobmanager.rpc.port: 6123 taskmanager_1 | jobmanager.heap.mb: 1024 taskmanager_1 | taskmanager.heap.mb: 1024 taskmanager_1 | taskmanager.numberOfTaskSlots: 4 taskmanager_1 | taskmanager.memory.preallocate: false taskmanager_1 | parallelism.default: 1 taskmanager_1 | web.port: 8081 taskmanager_1 | blob.server.port: 6124 taskmanager_1 | query.server.port: 6125 taskmanager_1 | blob.server.port: 6124 taskmanager_1 | query.server.port: 6125 taskmanager_1 | blob.server.port: 6124 taskmanager_1 | query.server.port: 6125 jobmanager_1 | Starting jobmanager as a console application on host e207d6ad4a1a. taskmanager_1 | Starting taskmanager as a console application on host 1d724ce8ae5e. jobmanager_1 | Slf4jLogger started taskmanager_1 | Slf4jLogger started taskmanager_1 | Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. taskmanager_1 | Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. jobmanager_1 | ########## BEFORE UPDATEMODEL ########## taskmanager_1 | The operator name DataSource (at main(ParallelMLExample.java:30) (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated. On 04/07/2018 02:46 PM, JP de Vooght wrote: > > Nico, all, > > I am still stuck with this. Upgraded the docker image to 1.4.2 and the > AMIDST library to 0.7.0 > > Just noticed this issue which signals logging issues outside > transforms: https://issues.apache.org/jira/browse/FLINK-7990 > > Could this be related? Although I don't see the relation to logback. > > Below is the library code invoked after "BEFORE updateModel" > > |try { Configuration config = new Configuration(); > config.setString(BN_NAME, this.dag.getName()); > config.setBytes(EFBN_NAME, > Serialization.serializeObject(efBayesianNetwork)); > DataSet<DataInstance> dataset = dataUpdate.getDataSet(); this.sumSS = > dataset.map(new SufficientSatisticsMAP()) .withParameters(config) > .reduce(new SufficientSatisticsReduce()) .collect().get(0); //Add the > prior sumSS.sum(efBayesianNetwork.createInitSufficientStatistics()); > JobExecutionResult result = > dataset.getExecutionEnvironment().getLastJobExecutionResult(); > numInstances = > result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName()); > numInstances++;//Initial counts }catch(Exception ex){ throw new > UndeclaredThrowableException(ex); } | > JP > > On 01/16/2018 10:50 AM, Nico Kruber wrote: >> Just a guess, but probably our logging initialisation changes the global >> log level (see conf/log4j.properties). DataStream.collect() executes the >> program along with creating a local Flink "cluster" (if you are testing >> locally / in an IDE) and initializing logging, among other things. >> >> Please comment the first line out and uncomment the following one to >> read like this: >> ========== >> # This affects logging for both user code and Flink >> #log4j.rootLogger=INFO, file >> >> # Uncomment this if you want to _only_ change Flink's logging >> log4j.logger.org.apache.flink=INFO >> ========== >> >> >> Nico >> >> On 13/01/18 13:52, j...@vooght.de wrote: >>> Hello, >>> I am learning Flink and using the docker image along with the AMIDST >>> library for this. >>> Below is a sample task from AMIDST which provides INFO output up until I >>> reach updateModel(). I pasted the short method as well and wonder what >>> prevents the Logger from >>> >>> //Set-up Flink session >>> env = ExecutionEnvironment.getExecutionEnvironment(); >>> env.getConfig().disableSysoutLogging(); >>> Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample"); >>> >>> //generate a random dataset >>> DataFlink<DataInstance> dataFlink = new >>> DataSetGenerator().generate(env, 1234, 1000, 5, 0); >>> >>> //Creates a DAG with the NaiveBayes structure for the random >>> dataset >>> DAG dag = >>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(), >>> "DiscreteVar4"); >>> LOG.info(dag.toString()); >>> >>> //Create the Learner object >>> ParameterLearningAlgorithm learningAlgorithmFlink = new >>> ParallelMaximumLikelihood(); >>> >>> //Learning parameters >>> learningAlgorithmFlink.setBatchSize(10); >>> learningAlgorithmFlink.setDAG(dag); >>> >>> //Initialize the learning process >>> learningAlgorithmFlink.initLearning(); >>> >>> //Learn from the flink data >>> LOG.info("BEFORE UPDATEMODEL"); >>> learningAlgorithmFlink.updateModel(dataFlink); >>> LOG.info("AFTER UPDATEMODEL"); >>> >>> //Print the learnt Bayes Net >>> BayesianNetwork bn = >>> learningAlgorithmFlink.getLearntBayesianNetwork(); >>> LOG.info(bn.toString()); >>> >>> >>> Below is the updateModel method. >>> >>> public double updateModel(DataFlink<DataInstance> dataUpdate) { >>> try { >>> Configuration config = new Configuration(); >>> config.setString(BN_NAME, this.dag.getName()); >>> config.setBytes(EFBN_NAME, >>> Serialization.serializeObject(efBayesianNetwork)); >>> >>> DataSet<DataInstance> dataset = dataUpdate.getDataSet(); >>> this.sumSS = dataset.map(new SufficientSatisticsMAP()) >>> .withParameters(config) >>> .reduce(new SufficientSatisticsReduce()) >>> .collect().get(0); >>> >>> //Add the prior >>> sumSS.sum(efBayesianNetwork.createInitSufficientStatistics()); >>> >>> JobExecutionResult result = >>> dataset.getExecutionEnvironment().getLastJobExecutionResult(); >>> >>> numInstances = >>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName()); >>> >>> numInstances++;//Initial counts >>> >>> }catch(Exception ex){ >>> throw new UndeclaredThrowableException(ex); >>> } >>> >>> return this.getLogMarginalProbability(); >>> } >>> >>> >>> Not sure why LOG.info past that method are not output to the console. >>> TIA >>> JP >