1. be careful, HDFS are better for large files, not bunches of small files.
2. if that's really what you want, roll it your own.
def writeLines(iterator: Iterator[(String, String)]) = {
val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
try {
while (iterator.hasNext) {
val item = iterator.next()
val key = item._1
val line = item._2
val writer = writers.get(key) match {
case Some(writer) => writer
case None =>
val path = arg(1) + key
val outputStream = FileSystem.get(new
Configuration()).create(new Path(path))
writer = new BufferedWriter(outputStream)
}
writer.writeLine(line)
} finally {
writers.values.foreach(._close())
}
}
val inputData = sc.textFile()
val keyValue = inputData.map(line => (key, line))
val partitions = keValue.partitionBy(new MyPartition(10))
partitions.foreachPartition(writeLines)
class MyPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
(key.toString.hashCode & Integer.MAX_VALUE) % numPartitions //
make sure lines with the same key in the same partition
}
}
2014-08-12 21:34 GMT+08:00 Fengyun RAO <[email protected]>:
> 1. be careful, HDFS are better for large files, not bunches of small files.
>
> 2. if that's really what you want, roll it your own.
>
> def writeAvro(iterator: Iterator[(String, String)]) = {
> val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer)
> map
> try {
> while (iterator.hasNext) {
> val item = iterator.next()
> val key = item._1
> val line = item._2
> val writer = writers.get(key) match {
> case Some(writer) => writer
> case None =>
> val path = arg(1) + key
> val outputStream = FileSystem.get(new Configuration()).create(new
> Path(path))
> writer = new BufferedWriter(outputStream)
> }
> writer.writeLine(line)
> } finally {
> writers.values.foreach(._close())
> }
> }
>
> val inputData = sc.textFile()
> val keyValue = inputData.map(line => (key, line))
> val partitions = keValue.partitionBy(new MyPartition(10))
> partitions.foreachPartition(writeLines)
>
>
> class MyPartitioner(partitions: Int) extends Partitioner {
> override def numPartitions: Int = partitions
>
> override def getPartition(key: Any): Int = {
> (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make
> sure lines with the same key in the same partition
>
> }
> }
>
>
>
> 2014-08-11 20:42 GMT+08:00 诺铁 <[email protected]>:
>
> hi,
>>
>> I have googled and find similar question without good answer,
>> http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark
>>
>> in short, I would like to separate raw data and divide by some key, for
>> example, create date, and put the in directory named by date, so that I can
>> easily access portion of data later.
>>
>> for now I have to extract all keys and then filter by key and save to
>> file repeatly. are there any good way to do this? or maybe I shouldn't do
>> such thing?
>>
>
>