Nico, all,

I am still stuck with this. Upgraded the docker image to 1.4.2 and the
AMIDST library to 0.7.0

Just noticed this issue which signals logging issues outside transforms:
https://issues.apache.org/jira/browse/FLINK-7990

Could this be related? Although I don't see the relation to logback.

Below is the library code invoked after "BEFORE updateModel"

|try { Configuration config = new Configuration();
config.setString(BN_NAME, this.dag.getName());
config.setBytes(EFBN_NAME,
Serialization.serializeObject(efBayesianNetwork)); DataSet<DataInstance>
dataset = dataUpdate.getDataSet(); this.sumSS = dataset.map(new
SufficientSatisticsMAP()) .withParameters(config) .reduce(new
SufficientSatisticsReduce()) .collect().get(0); //Add the prior
sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
JobExecutionResult result =
dataset.getExecutionEnvironment().getLastJobExecutionResult();
numInstances =
result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
numInstances++;//Initial counts }catch(Exception ex){ throw new
UndeclaredThrowableException(ex); } |

JP

On 01/16/2018 10:50 AM, Nico Kruber wrote:
> Just a guess, but probably our logging initialisation changes the global
> log level (see conf/log4j.properties). DataStream.collect() executes the
> program along with creating a local Flink "cluster" (if you are testing
> locally / in an IDE) and initializing logging, among other things.
>
> Please comment the first line out and uncomment the following one to
> read like this:
> ==========
> # This affects logging for both user code and Flink
> #log4j.rootLogger=INFO, file
>
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=INFO
> ==========
>
>
> Nico
>
> On 13/01/18 13:52, j...@vooght.de wrote:
>> Hello,
>> I am learning Flink and using the docker image along with the AMIDST
>> library for this.
>> Below is a sample task from AMIDST which provides INFO output up until I
>> reach updateModel(). I pasted the short method as well and wonder what
>> prevents the Logger from
>>
>>         //Set-up Flink session
>>         env = ExecutionEnvironment.getExecutionEnvironment();
>>         env.getConfig().disableSysoutLogging();
>>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>
>>         //generate a random dataset
>>         DataFlink<DataInstance> dataFlink = new
>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>
>>         //Creates a DAG with the NaiveBayes structure for the random
>> dataset
>>         DAG dag =
>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>> "DiscreteVar4");
>>         LOG.info(dag.toString());
>>
>>         //Create the Learner object
>>         ParameterLearningAlgorithm learningAlgorithmFlink = new
>> ParallelMaximumLikelihood();
>>
>>         //Learning parameters
>>         learningAlgorithmFlink.setBatchSize(10);
>>         learningAlgorithmFlink.setDAG(dag);
>>
>>         //Initialize the learning process
>>         learningAlgorithmFlink.initLearning();
>>
>>         //Learn from the flink data
>>         LOG.info("BEFORE UPDATEMODEL");
>>         learningAlgorithmFlink.updateModel(dataFlink);
>>         LOG.info("AFTER UPDATEMODEL");
>>
>>         //Print the learnt Bayes Net
>>         BayesianNetwork bn =
>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>         LOG.info(bn.toString());
>>
>>
>> Below is the updateModel method.
>>
>>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>         try {
>>             Configuration config = new Configuration();
>>             config.setString(BN_NAME, this.dag.getName());
>>             config.setBytes(EFBN_NAME,
>> Serialization.serializeObject(efBayesianNetwork));
>>
>>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>                     .withParameters(config)
>>                     .reduce(new SufficientSatisticsReduce())
>>                     .collect().get(0);
>>
>>             //Add the prior
>>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>
>>             JobExecutionResult result =
>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>
>>             numInstances =
>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>
>>             numInstances++;//Initial counts
>>
>>         }catch(Exception ex){
>>             throw new UndeclaredThrowableException(ex);
>>         }
>>
>>         return this.getLogMarginalProbability();
>>     }
>>
>>
>> Not sure why LOG.info past that method are not output to the console.
>> TIA
>> JP

Reply via email to