Are you using checkpointing? I had a similar issue when recreating a streaming context from checkpoint as broadcast variables are not checkpointed. On 23 Jun 2015 5:01 pm, "Nipun Arora" <nipunarora2...@gmail.com> wrote:
> Hi, > > I have a spark streaming application where I need to access a model saved > in a HashMap. > I have *no problems in running the same code with broadcast variables in > the local installation.* However I get a *null pointer* *exception* when > I deploy it on my spark test cluster. > > > I have stored a model in a HashMap<String, FieldModel> which is > serializable. I use a broadcast variables declared as a global static > variable to broadcast this hashmap: > > public static Broadcast<HashMap<String,FieldModel>> br; > > HashMap<String,FieldModel> hm = checkerObj.getModel(esserver, type); > > br = ssc.sparkContext().broadcast(hm); > > > I need to access this model in my mapper phase, and do some operation > based on the checkup. The following is a snippet of how I access the > broadcast variable. > > > JavaDStream<Tuple3<Long,Double,String>> split = matched.map(new > GenerateType2Scores()); > > > class GenerateType2Scores implements Function<String, Tuple3<Long, Double, > String>> { > @Override > public Tuple3<Long, Double, String> call(String s) throws Exception{ > > Long time = Type2ViolationChecker.getMTS(s); > HashMap<String,FieldModel> temphm= Type2ViolationChecker.br.value(); > > Double score = Type2ViolationChecker.getAnomalyScore(temphm,s); > return new Tuple3<Long, Double, String>(time,score, s);} > } > > The temphm should refer to the hashmap stored in the broadcast variable. > Can anyone help me understand what is the correct way to access broadcast > variables in JAVA? > > Thanks > Nipun >