Nico, all, I am still stuck with this. Upgraded the docker image to 1.4.2 and the AMIDST library to 0.7.0
Just noticed this issue which signals logging issues outside transforms: https://issues.apache.org/jira/browse/FLINK-7990 Could this be related? Although I don't see the relation to logback. Below is the library code invoked after "BEFORE updateModel" |try { Configuration config = new Configuration(); config.setString(BN_NAME, this.dag.getName()); config.setBytes(EFBN_NAME, Serialization.serializeObject(efBayesianNetwork)); DataSet<DataInstance> dataset = dataUpdate.getDataSet(); this.sumSS = dataset.map(new SufficientSatisticsMAP()) .withParameters(config) .reduce(new SufficientSatisticsReduce()) .collect().get(0); //Add the prior sumSS.sum(efBayesianNetwork.createInitSufficientStatistics()); JobExecutionResult result = dataset.getExecutionEnvironment().getLastJobExecutionResult(); numInstances = result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName()); numInstances++;//Initial counts }catch(Exception ex){ throw new UndeclaredThrowableException(ex); } | JP On 01/16/2018 10:50 AM, Nico Kruber wrote: > Just a guess, but probably our logging initialisation changes the global > log level (see conf/log4j.properties). DataStream.collect() executes the > program along with creating a local Flink "cluster" (if you are testing > locally / in an IDE) and initializing logging, among other things. > > Please comment the first line out and uncomment the following one to > read like this: > ========== > # This affects logging for both user code and Flink > #log4j.rootLogger=INFO, file > > # Uncomment this if you want to _only_ change Flink's logging > log4j.logger.org.apache.flink=INFO > ========== > > > Nico > > On 13/01/18 13:52, j...@vooght.de wrote: >> Hello, >> I am learning Flink and using the docker image along with the AMIDST >> library for this. >> Below is a sample task from AMIDST which provides INFO output up until I >> reach updateModel(). I pasted the short method as well and wonder what >> prevents the Logger from >> >> //Set-up Flink session >> env = ExecutionEnvironment.getExecutionEnvironment(); >> env.getConfig().disableSysoutLogging(); >> Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample"); >> >> //generate a random dataset >> DataFlink<DataInstance> dataFlink = new >> DataSetGenerator().generate(env, 1234, 1000, 5, 0); >> >> //Creates a DAG with the NaiveBayes structure for the random >> dataset >> DAG dag = >> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(), >> "DiscreteVar4"); >> LOG.info(dag.toString()); >> >> //Create the Learner object >> ParameterLearningAlgorithm learningAlgorithmFlink = new >> ParallelMaximumLikelihood(); >> >> //Learning parameters >> learningAlgorithmFlink.setBatchSize(10); >> learningAlgorithmFlink.setDAG(dag); >> >> //Initialize the learning process >> learningAlgorithmFlink.initLearning(); >> >> //Learn from the flink data >> LOG.info("BEFORE UPDATEMODEL"); >> learningAlgorithmFlink.updateModel(dataFlink); >> LOG.info("AFTER UPDATEMODEL"); >> >> //Print the learnt Bayes Net >> BayesianNetwork bn = >> learningAlgorithmFlink.getLearntBayesianNetwork(); >> LOG.info(bn.toString()); >> >> >> Below is the updateModel method. >> >> public double updateModel(DataFlink<DataInstance> dataUpdate) { >> try { >> Configuration config = new Configuration(); >> config.setString(BN_NAME, this.dag.getName()); >> config.setBytes(EFBN_NAME, >> Serialization.serializeObject(efBayesianNetwork)); >> >> DataSet<DataInstance> dataset = dataUpdate.getDataSet(); >> this.sumSS = dataset.map(new SufficientSatisticsMAP()) >> .withParameters(config) >> .reduce(new SufficientSatisticsReduce()) >> .collect().get(0); >> >> //Add the prior >> sumSS.sum(efBayesianNetwork.createInitSufficientStatistics()); >> >> JobExecutionResult result = >> dataset.getExecutionEnvironment().getLastJobExecutionResult(); >> >> numInstances = >> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName()); >> >> numInstances++;//Initial counts >> >> }catch(Exception ex){ >> throw new UndeclaredThrowableException(ex); >> } >> >> return this.getLogMarginalProbability(); >> } >> >> >> Not sure why LOG.info past that method are not output to the console. >> TIA >> JP