The `conf` object will be sent to other nodes via Broadcast. Here is the scaladoc of Broadcast: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast
In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later). Best Regards, Shixiong Zhu 2014-11-12 15:20 GMT+08:00 qiaou <qiaou8...@gmail.com>: > this work! > but can you explain why should use like this? > > -- > qiaou > 已使用 Sparrow <http://www.sparrowmailapp.com/?sig> > > 在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道: > > You need to create a new configuration for each RDD. Therefore, "val > hbaseConf = HBaseConfigUtil.getHBaseConfiguration" should be changed to "val > hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration)" > > Best Regards, > Shixiong Zhu > > 2014-11-12 14:53 GMT+08:00 qiaou <qiaou8...@gmail.com>: > > ok here is the code > > def hbaseQuery:(String)=>RDD[Result] = { > val generateRdd = (area:String)=>{ > val startRowKey = s"$area${RowKeyUtils.convertToHex(startId, > 10)}AAAAAAAAAAAAAAAA" > val stopRowKey = s"$area${RowKeyUtils.convertToHex(endId, > 10)}GGGGGGGGGGGGGGGG" > println(s"startRowKey:${startRowKey}") > println(s"stopRowKey :${stopRowKey}") > > val scan = new Scan() > scan.setStartRow(Bytes.toBytes(startRowKey)) > scan.setStopRow(Bytes.toBytes(stopRowKey)) > val filterList: FilterList = new FilterList() > if (appKey != null && !appKey.equals("_")) { > val appKeyFilter: SingleColumnValueFilter = > new SingleColumnValueFilter(Bytes.toBytes("clientInfo"), > Bytes.toBytes("optKey"), CompareOp.EQUAL, Bytes.toBytes(appKey)) > filterList.addFilter(appKeyFilter) > } > if (imei != null && !imei.equals("_")) { > val imeiFilter: SingleColumnValueFilter = > new SingleColumnValueFilter(Bytes.toBytes("clientInfo"), > Bytes.toBytes("optImei"), CompareOp.EQUAL, Bytes.toBytes(imei)) > filterList.addFilter(imeiFilter) > } > if (filterList.getFilters != null && filterList.getFilters.size() > > 0) { > scan.setFilter(filterList) > } > scan.setCaching(10000) > > val hbaseConf = HBaseConfigUtil.getHBaseConfiguration > hbaseConf.set(TableInputFormat.INPUT_TABLE, "asrLogFeedBack") > hbaseConf.set(TableInputFormat.SCAN, > Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) > > SparkUtil.getSingleSparkContext() > .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], > classOf[ImmutableBytesWritable], classOf[Result]).map { > case (_: ImmutableBytesWritable, result: Result) => { > result > } > } > } > return generateRdd > } > > -- > qiaou > 已使用 Sparrow <http://www.sparrowmailapp.com/?sig> > > 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道: > > Could you provide the code of hbaseQuery? It maybe doesn't support to > execute in parallel. > > Best Regards, > Shixiong Zhu > > 2014-11-12 14:32 GMT+08:00 qiaou <qiaou8...@gmail.com>: > > Hi: > I got a problem with using the union method of RDD > things like this > I get a function like > def hbaseQuery(area:string):RDD[Result]= ??? > when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it > returns 0 > however when use like this sc.parallize(hbaseQuery('aa’).collect.toList > ::: hbaseQuery(’bb’).collect.toList).count() it return the right value > obviously i have got an action after my transformation action ,but why it > did not work > fyi > > -- > qiaou > 已使用 Sparrow <http://www.sparrowmailapp.com/?sig> > > > > > >