Hi, Ted
thanks for your help.
I check the jar, it is in classpath, and now the problem is :
1、 Follow codes runs good, and it put the result to hbse:
val res =
lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.first()
val configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
configuration.set("hbase.master", "192.168.1.66:60000");
val table = new HTable(configuration, "ljh_test3");
var put = new Put(Bytes.toBytes(res.toKey()));
put.add(Bytes.toBytes("f"), Bytes.toBytes("c"),
Bytes.toBytes(res.positiveCount));
table.put(put);
table.flushCommits()
2、But if I change the first() function to foreach:
lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.foreach({res=>
val configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
configuration.set("hbase.master", "192.168.1.66:60000");
val table = new HTable(configuration, "ljh_test3");
var put = new Put(Bytes.toBytes(res.toKey()));
put.add(Bytes.toBytes("f"), Bytes.toBytes("c"),
Bytes.toBytes(res.positiveCount));
table.put(put);
})
the application hung, and the last log is :
15/10/28 09:30:33 INFO DAGScheduler: Missing parents for ResultStage 2: List()
15/10/28 09:30:33 INFO DAGScheduler: Submitting ResultStage 2
(MapPartitionsRDD[6] at values at TrainModel3.scala:98), which is now runnable
15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(7032) called with
curMem=264045, maxMem=278302556
15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 6.9 KB, free 265.2 MB)
15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(3469) called with
curMem=271077, maxMem=278302556
15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in
memory (estimated size 3.4 KB, free 265.1 MB)
15/10/28 09:30:33 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on
10.120.69.53:43019 (size: 3.4 KB, free: 265.4 MB)
15/10/28 09:30:33 INFO SparkContext: Created broadcast 3 from broadcast at
DAGScheduler.scala:874
15/10/28 09:30:33 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 2 (MapPartitionsRDD[6] at values at TrainModel3.scala:98)
15/10/28 09:30:33 INFO YarnScheduler: Adding task set 2.0 with 1 tasks
15/10/28 09:30:33 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2,
gdc-dn147-formal.i.nease.net, PROCESS_LOCAL, 1716 bytes)
15/10/28 09:30:34 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on
gdc-dn147-formal.i.nease.net:59814 (size: 3.4 KB, free: 1060.3 MB)
15/10/28 09:30:34 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to gdc-dn147-formal.i.nease.net:52904
15/10/28 09:30:34 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 0 is 154 bytes
3、besides, I take the configuration and HTable out of foreach:
val configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
configuration.set("hbase.master", "192.168.1.66:60000");
val table = new HTable(configuration, "ljh_test3");
lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
TrainFeature())(seqOp, combOp).values.foreach({ res =>
var put = new Put(Bytes.toBytes(res.toKey()));
put.add(Bytes.toBytes("f"), Bytes.toBytes("c"),
Bytes.toBytes(res.positiveCount));
table.put(put);
})
table.flushCommits()
found serializable problem:
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD
$$anonfun$foreach$1.apply(RDD.scala:869)
at
org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
at com.chencai.spark.ml.TrainModel3$.train(TrainModel3.scala:100)
at com.chencai.spark.ml.TrainModel3$.main(TrainModel3.scala:115)
at com.chencai.spark.ml.TrainModel3.main(TrainModel3.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration,
value: Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml, hbase-default.xml, hbase-site.xml)
- field (class: com.chencai.spark.ml.TrainModel3$$anonfun$train$5,
name: configuration$1, type: class org.apache.hadoop.conf.Configuration)
- object (class
com.chencai.spark.ml.TrainModel3$$anonfun$train$5, <function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 21 more
> 在 2015年10月28日,09:26,Ted Yu <[email protected]> 写道:
>
> Jinghong:
> In one of earlier threads on storing data to hbase, it was found that htrace
> jar was not on classpath, leading to write failure.
>
> Can you check whether you are facing the same problem ?
>
> Cheers
>
> On Tue, Oct 27, 2015 at 5:11 AM, Ted Yu <[email protected]
> <mailto:[email protected]>> wrote:
> Jinghong:
> Hadmin variable is not used. You can omit that line.
>
> Which hbase release are you using ?
>
> As Deng said, don't flush per row.
>
> Cheers
>
> On Oct 27, 2015, at 3:21 AM, Deng Ching-Mallete <[email protected]
> <mailto:[email protected]>> wrote:
>
>> Hi,
>>
>> It would be more efficient if you configure the table and flush the commits
>> by partition instead of per element in the RDD. The latter works fine
>> because you only have 4 elements, but it won't bid well for large data sets
>> IMO..
>>
>> Thanks,
>> Deng
>>
>> On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>>
>> Hi,
>>
>> I write my result to hdfs, it did well:
>>
>> val model =
>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>> TrainFeature())(seqOp, combOp).values
>> model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" +
>> a.positiveCount)).saveAsTextFile(modelDataPath);
>>
>> But when I want to write to hbase, the applicaton hung, no log, no response,
>> just stay there, and nothing is written to hbase:
>>
>> val model =
>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>> TrainFeature())(seqOp, combOp).values.foreach({ res =>
>> val configuration = HBaseConfiguration.create();
>> configuration.set("hbase.zookeeper.property.clientPort", "2181");
>> configuration.set("hbase.zookeeper.quorum", “192.168.1.66");
>> configuration.set("hbase.master", "192.168.1:60000");
>> val hadmin = new HBaseAdmin(configuration);
>> val table = new HTable(configuration, "ljh_test3");
>> var put = new Put(Bytes.toBytes(res.toKey()));
>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"),
>> Bytes.toBytes(res.totalCount + res.positiveCount));
>> table.put(put);
>> table.flushCommits()
>> })
>>
>> And then I try to write som simple data to hbase, it did well too:
>>
>> sc.parallelize(Array(1,2,3,4)).foreach({ res =>
>> val configuration = HBaseConfiguration.create();
>> configuration.set("hbase.zookeeper.property.clientPort", "2181");
>> configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>> configuration.set("hbase.master", "192.168.1:60000");
>> val hadmin = new HBaseAdmin(configuration);
>> val table = new HTable(configuration, "ljh_test3");
>> var put = new Put(Bytes.toBytes(res));
>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res));
>> table.put(put);
>> table.flushCommits()
>> })
>>
>> what is the problem with the 2rd code? thanks a lot.
>>
>>
>